Compare commits
6 Commits
v0.31.1
...
4e78d83092
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4e78d83092 | ||
|
|
0e6331fa6a | ||
|
|
c98c5df63c | ||
|
|
0981c74da9 | ||
|
|
31518799f6 | ||
|
|
e8b4b9b48e |
12
README.md
12
README.md
@@ -10,6 +10,7 @@ This is a pip package that can be installed into any project and covers the foll
|
||||
- logging update with exception logs
|
||||
- requests wrapper for easier auth pass on access
|
||||
- dict fingerprinting
|
||||
- sending email
|
||||
- jmespath search
|
||||
- json helpers for conten replace and output
|
||||
- dump outputs for data for debugging
|
||||
@@ -26,6 +27,7 @@ This is a pip package that can be installed into any project and covers the foll
|
||||
- debug_handling: various debug helpers like data dumper, timer, utilization, etc
|
||||
- db_handling: SQLite interface class
|
||||
- encyption_handling: symmetric encryption
|
||||
- email_handling: simple email sending
|
||||
- file_handling: crc handling for file content and file names, progress bar
|
||||
- json_handling: jmespath support and json date support, replace content in dict with json paths
|
||||
- iterator_handling: list and dictionary handling support (search, fingerprinting, etc)
|
||||
@@ -50,7 +52,7 @@ Have the following setup in `project.toml`
|
||||
|
||||
```toml
|
||||
[[tool.uv.index]]
|
||||
name = "egra-gitea"
|
||||
name = "opj-pypi"
|
||||
url = "https://git.egplusww.jp/api/packages/PyPI/pypi/simple/"
|
||||
publish-url = "https://git.egplusww.jp/api/packages/PyPI/pypi"
|
||||
explicit = true
|
||||
@@ -58,15 +60,15 @@ explicit = true
|
||||
|
||||
```sh
|
||||
uv build
|
||||
uv publish --index egra-gitea --token <gitea token>
|
||||
uv publish --index opj-pypi --token <gitea token>
|
||||
```
|
||||
|
||||
## Test package
|
||||
## Use package
|
||||
|
||||
We must set the full index URL here because we run with "--no-project"
|
||||
|
||||
```sh
|
||||
uv run --with corelibs --index egra-gitea=https://git.egplusww.jp/api/packages/PyPI/pypi/simple/ --no-project -- python -c "import corelibs"
|
||||
uv run --with corelibs --index opj-pypi=https://git.egplusww.jp/api/packages/PyPI/pypi/simple/ --no-project -- python -c "import corelibs"
|
||||
```
|
||||
|
||||
### Python tests
|
||||
@@ -99,7 +101,7 @@ uv run test-run/<script>
|
||||
This will also add the index entry
|
||||
|
||||
```sh
|
||||
uv add corelibs --index egra-gitea=https://git.egplusww.jp/api/packages/PyPI/pypi/simple/
|
||||
uv add corelibs --index opj-pypi=https://git.egplusww.jp/api/packages/PyPI/pypi/simple/
|
||||
```
|
||||
|
||||
## Python venv setup
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# MARK: Project info
|
||||
[project]
|
||||
name = "corelibs"
|
||||
version = "0.31.1"
|
||||
version = "0.33.0"
|
||||
description = "Collection of utils for Python scripts"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.13"
|
||||
@@ -17,7 +17,7 @@ dependencies = [
|
||||
|
||||
# MARK: build target
|
||||
[[tool.uv.index]]
|
||||
name = "egra-gitea"
|
||||
name = "opj-pypi"
|
||||
url = "https://git.egplusww.jp/api/packages/PyPI/pypi/simple/"
|
||||
publish-url = "https://git.egplusww.jp/api/packages/PyPI/pypi"
|
||||
explicit = true
|
||||
@@ -63,12 +63,13 @@ ignore = [
|
||||
[tool.pylint.MASTER]
|
||||
# this is for the tests/etc folders
|
||||
init-hook='import sys; sys.path.append("src/")'
|
||||
|
||||
# MARK: Testing
|
||||
[tool.pytest.ini_options]
|
||||
testpaths = [
|
||||
"tests",
|
||||
]
|
||||
|
||||
|
||||
[tool.coverage.run]
|
||||
omit = [
|
||||
"*/tests/*",
|
||||
|
||||
@@ -159,10 +159,14 @@ def parse_flexible_date(
|
||||
|
||||
# Try different parsing methods
|
||||
parsers: list[Callable[[str], datetime]] = [
|
||||
# ISO 8601 format
|
||||
# ISO 8601 format, also with missing "T"
|
||||
lambda x: datetime.fromisoformat(x), # pylint: disable=W0108
|
||||
lambda x: datetime.fromisoformat(x.replace(' ', 'T')), # pylint: disable=W0108
|
||||
# Simple date format
|
||||
lambda x: datetime.strptime(x, "%Y-%m-%d"),
|
||||
# datetime without T
|
||||
lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S"),
|
||||
lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f"),
|
||||
# Alternative ISO formats (fallback)
|
||||
lambda x: datetime.strptime(x, "%Y-%m-%dT%H:%M:%S"),
|
||||
lambda x: datetime.strptime(x, "%Y-%m-%dT%H:%M:%S.%f"),
|
||||
|
||||
0
src/corelibs/email_handling/__init__.py
Normal file
0
src/corelibs/email_handling/__init__.py
Normal file
199
src/corelibs/email_handling/send_email.py
Normal file
199
src/corelibs/email_handling/send_email.py
Normal file
@@ -0,0 +1,199 @@
|
||||
"""
|
||||
Send email wrapper
|
||||
"""
|
||||
|
||||
import smtplib
|
||||
from email.message import EmailMessage
|
||||
from typing import TYPE_CHECKING, Any
|
||||
if TYPE_CHECKING:
|
||||
from corelibs.logging_handling.log import Logger
|
||||
|
||||
|
||||
class SendEmail:
|
||||
"""
|
||||
send emails based on a template to a list of receivers
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
log: "Logger",
|
||||
settings: dict[str, Any],
|
||||
template: dict[str, str],
|
||||
from_email: str,
|
||||
combined_send: bool = True,
|
||||
receivers: list[str] | None = None,
|
||||
data: list[dict[str, str]] | None = None,
|
||||
):
|
||||
"""
|
||||
init send email class
|
||||
|
||||
Args:
|
||||
template (dict): Dictionary with body and subject
|
||||
from_email (str): from email as "Name" <email>
|
||||
combined_send (bool): True for sending as one set for all receivers
|
||||
receivers (list): list of emails to send to
|
||||
data (dict): data to replace in template
|
||||
args (Namespace): _description_
|
||||
"""
|
||||
self.log = log
|
||||
self.settings = settings
|
||||
# internal settings
|
||||
self.template = template
|
||||
self.from_email = from_email
|
||||
self.combined_send = combined_send
|
||||
self.receivers = receivers
|
||||
self.data = data
|
||||
|
||||
def send_email(
|
||||
self,
|
||||
data: list[dict[str, str]] | None,
|
||||
receivers: list[str] | None,
|
||||
template: dict[str, str] | None = None,
|
||||
from_email: str | None = None,
|
||||
combined_send: bool | None = None,
|
||||
test_only: bool | None = None
|
||||
):
|
||||
"""
|
||||
build email and send
|
||||
|
||||
Arguments:
|
||||
data {list[dict[str, str]] | None} -- _description_
|
||||
receivers {list[str] | None} -- _description_
|
||||
combined_send {bool | None} -- _description_
|
||||
|
||||
Keyword Arguments:
|
||||
template {dict[str, str] | None} -- _description_ (default: {None})
|
||||
from_email {str | None} -- _description_ (default: {None})
|
||||
|
||||
Raises:
|
||||
ValueError: _description_
|
||||
ValueError: _description_
|
||||
"""
|
||||
if data is None and self.data is not None:
|
||||
data = self.data
|
||||
if data is None:
|
||||
raise ValueError("No replace data set, cannot send email")
|
||||
if receivers is None and self.receivers is not None:
|
||||
receivers = self.receivers
|
||||
if receivers is None:
|
||||
raise ValueError("No receivers list set, cannot send email")
|
||||
if combined_send is None:
|
||||
combined_send = self.combined_send
|
||||
if test_only is not None:
|
||||
self.settings['test'] = test_only
|
||||
|
||||
if template is None:
|
||||
template = self.template
|
||||
if from_email is None:
|
||||
from_email = self.from_email
|
||||
|
||||
if not template['subject'] or not template['body']:
|
||||
raise ValueError("Both Subject and Body must be set")
|
||||
|
||||
self.log.debug(
|
||||
"[EMAIL]:\n"
|
||||
f"Subject: {template['subject']}\n"
|
||||
f"Body: {template['body']}\n"
|
||||
f"From: {from_email}\n"
|
||||
f"Combined send: {combined_send}\n"
|
||||
f"Receivers: {receivers}\n"
|
||||
f"Replace data: {data}"
|
||||
)
|
||||
|
||||
# send email
|
||||
self.send_email_list(
|
||||
self.prepare_email_content(
|
||||
from_email, template, data
|
||||
),
|
||||
receivers,
|
||||
combined_send,
|
||||
test_only
|
||||
)
|
||||
|
||||
def prepare_email_content(
|
||||
self,
|
||||
from_email: str,
|
||||
template: dict[str, str],
|
||||
data: list[dict[str, str]],
|
||||
) -> list[EmailMessage]:
|
||||
"""
|
||||
prepare email for sending
|
||||
|
||||
Args:
|
||||
template (dict): template data for this email
|
||||
data (dict): data to replace in email
|
||||
|
||||
Returns:
|
||||
list: Email Message Objects as list
|
||||
"""
|
||||
_subject = ""
|
||||
_body = ""
|
||||
msg: list[EmailMessage] = []
|
||||
for replace in data:
|
||||
_subject = template["subject"]
|
||||
_body = template["body"]
|
||||
for key, value in replace.items():
|
||||
_subject = _subject.replace(f"{{{{{key}}}}}", value)
|
||||
_body = _body.replace(f"{{{{{key}}}}}", value)
|
||||
# create a simple email and add subhect, from email
|
||||
msg_email = EmailMessage()
|
||||
# msg.set_content(_body, charset='utf-8', cte='quoted-printable')
|
||||
msg_email.set_content(_body, charset="utf-8")
|
||||
msg_email["Subject"] = _subject
|
||||
msg_email["From"] = from_email
|
||||
# push to array for sening
|
||||
msg.append(msg_email)
|
||||
return msg
|
||||
|
||||
def send_email_list(
|
||||
self,
|
||||
email: list[EmailMessage], receivers: list[str],
|
||||
combined_send: bool | None = None,
|
||||
test_only: bool | None = None
|
||||
):
|
||||
"""
|
||||
send email to receivers list
|
||||
|
||||
Args:
|
||||
email (list): Email Message object with set obdy, subject, from as list
|
||||
receivers (array): email receivers list as array
|
||||
combined_send (bool): True for sending as one set for all receivers
|
||||
"""
|
||||
|
||||
if test_only is not None:
|
||||
self.settings['test'] = test_only
|
||||
|
||||
# localhost (postfix does the rest)
|
||||
smtp = None
|
||||
smtp_host = self.settings.get('smtp_host', "localhost")
|
||||
try:
|
||||
smtp = smtplib.SMTP(smtp_host)
|
||||
except ConnectionRefusedError as e:
|
||||
self.log.error("Could not open SMTP connection to: %s, %s", smtp_host, e)
|
||||
# loop over messages and then over recievers
|
||||
for msg in email:
|
||||
if combined_send is True:
|
||||
msg["To"] = ", ".join(receivers)
|
||||
if not self.settings.get('test'):
|
||||
if smtp is not None:
|
||||
smtp.send_message(msg, msg["From"], receivers)
|
||||
else:
|
||||
self.log.info(f"[EMAIL] Test, not sending email\n{msg}")
|
||||
else:
|
||||
for receiver in receivers:
|
||||
# send to
|
||||
self.log.debug(f"===> Send to: {receiver}")
|
||||
if "To" in msg:
|
||||
msg.replace_header("To", receiver)
|
||||
else:
|
||||
msg["To"] = receiver
|
||||
if not self.settings.get('test'):
|
||||
if smtp is not None:
|
||||
smtp.send_message(msg)
|
||||
else:
|
||||
self.log.info(f"[EMAIL] Test, not sending email\n{msg}")
|
||||
# close smtp
|
||||
if smtp is not None:
|
||||
smtp.quit()
|
||||
|
||||
# __END__
|
||||
75
src/corelibs/file_handling/file_bom_encoding.py
Normal file
75
src/corelibs/file_handling/file_bom_encoding.py
Normal file
@@ -0,0 +1,75 @@
|
||||
"""
|
||||
File check if BOM encoded, needed for CSV load
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import TypedDict
|
||||
|
||||
|
||||
class BomEncodingInfo(TypedDict):
|
||||
"""BOM encoding info"""
|
||||
has_bom: bool
|
||||
bom_type: str | None
|
||||
encoding: str | None
|
||||
bom_length: int
|
||||
bom_pattern: bytes | None
|
||||
|
||||
|
||||
def is_bom_encoded(file_path: Path) -> bool:
|
||||
"""
|
||||
Detect if a file is BOM encoded
|
||||
|
||||
Args:
|
||||
file_path (str): Path to the file to check
|
||||
|
||||
Returns:
|
||||
bool: True if file has BOM, False otherwise
|
||||
"""
|
||||
return is_bom_encoded_info(file_path)['has_bom']
|
||||
|
||||
|
||||
def is_bom_encoded_info(file_path: Path) -> BomEncodingInfo:
|
||||
"""
|
||||
Enhanced BOM detection with additional file analysis
|
||||
|
||||
Args:
|
||||
file_path (str): Path to the file to check
|
||||
|
||||
Returns:
|
||||
dict: Comprehensive BOM and encoding information
|
||||
"""
|
||||
try:
|
||||
# Read first 1024 bytes for analysis
|
||||
with open(file_path, 'rb') as f:
|
||||
header = f.read(4)
|
||||
|
||||
bom_patterns = {
|
||||
b'\xef\xbb\xbf': ('UTF-8', 'utf-8', 3),
|
||||
b'\xff\xfe\x00\x00': ('UTF-32 LE', 'utf-32-le', 4),
|
||||
b'\x00\x00\xfe\xff': ('UTF-32 BE', 'utf-32-be', 4),
|
||||
b'\xff\xfe': ('UTF-16 LE', 'utf-16-le', 2),
|
||||
b'\xfe\xff': ('UTF-16 BE', 'utf-16-be', 2),
|
||||
}
|
||||
|
||||
for bom_pattern, (encoding_name, encoding, length) in bom_patterns.items():
|
||||
if header.startswith(bom_pattern):
|
||||
return {
|
||||
'has_bom': True,
|
||||
'bom_type': encoding_name,
|
||||
'encoding': encoding,
|
||||
'bom_length': length,
|
||||
'bom_pattern': bom_pattern
|
||||
}
|
||||
|
||||
return {
|
||||
'has_bom': False,
|
||||
'bom_type': None,
|
||||
'encoding': None,
|
||||
'bom_length': 0,
|
||||
'bom_pattern': None
|
||||
}
|
||||
except Exception as e:
|
||||
raise ValueError(f"Error checking BOM encoding: {e}") from e
|
||||
|
||||
|
||||
# __END__
|
||||
31
test-run/file_handling/file_bom_check.py
Normal file
31
test-run/file_handling/file_bom_check.py
Normal file
@@ -0,0 +1,31 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
BOM check for files
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
from corelibs.file_handling.file_bom_encoding import is_bom_encoded, is_bom_encoded_info
|
||||
from corelibs.debug_handling.dump_data import dump_data
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""
|
||||
Check files for BOM encoding
|
||||
"""
|
||||
base_path = Path(__file__).resolve().parent
|
||||
for file_path in [
|
||||
'test-data/sample_with_bom.csv',
|
||||
'test-data/sample_without_bom.csv',
|
||||
]:
|
||||
has_bom = is_bom_encoded(base_path.joinpath(file_path))
|
||||
bom_info = is_bom_encoded_info(base_path.joinpath(file_path))
|
||||
print(f'File: {file_path}')
|
||||
print(f' Has BOM: {has_bom}')
|
||||
print(f' BOM Info: {dump_data(bom_info)}')
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
# __END__
|
||||
6
test-run/file_handling/test-data/sample_with_bom.csv
Normal file
6
test-run/file_handling/test-data/sample_with_bom.csv
Normal file
@@ -0,0 +1,6 @@
|
||||
Name,Age,City,Country
|
||||
John Doe,25,New York,USA
|
||||
Jane Smith,30,London,UK
|
||||
山田太郎,28,東京,Japan
|
||||
María García,35,Madrid,Spain
|
||||
François Dupont,42,Paris,France
|
||||
|
6
test-run/file_handling/test-data/sample_without_bom.csv
Normal file
6
test-run/file_handling/test-data/sample_without_bom.csv
Normal file
@@ -0,0 +1,6 @@
|
||||
Name,Age,City,Country
|
||||
John Doe,25,New York,USA
|
||||
Jane Smith,30,London,UK
|
||||
山田太郎,28,東京,Japan
|
||||
María García,35,Madrid,Spain
|
||||
François Dupont,42,Paris,France
|
||||
|
0
tests/integration/fixtures/__init__.py
Normal file
0
tests/integration/fixtures/__init__.py
Normal file
@@ -275,6 +275,53 @@ class TestParseFlexibleDate:
|
||||
assert isinstance(result, datetime)
|
||||
assert result.tzinfo is not None
|
||||
|
||||
def test_parse_flexible_date_missing_t_with_timezone_shift(self):
|
||||
"""Test parse_flexible_date with timezone shift"""
|
||||
result = parse_flexible_date('2023-12-25 15:30:45+00:00', timezone_tz='Asia/Tokyo', shift_time_zone=True)
|
||||
assert isinstance(result, datetime)
|
||||
assert result.tzinfo is not None
|
||||
|
||||
def test_parse_flexible_date_space_separated_datetime(self):
|
||||
"""Test parse_flexible_date with space-separated datetime format"""
|
||||
result = parse_flexible_date('2023-12-25 15:30:45')
|
||||
assert isinstance(result, datetime)
|
||||
assert result.year == 2023
|
||||
assert result.month == 12
|
||||
assert result.day == 25
|
||||
assert result.hour == 15
|
||||
assert result.minute == 30
|
||||
assert result.second == 45
|
||||
|
||||
def test_parse_flexible_date_space_separated_with_microseconds(self):
|
||||
"""Test parse_flexible_date with space-separated datetime and microseconds"""
|
||||
result = parse_flexible_date('2023-12-25 15:30:45.123456')
|
||||
assert isinstance(result, datetime)
|
||||
assert result.year == 2023
|
||||
assert result.month == 12
|
||||
assert result.day == 25
|
||||
assert result.hour == 15
|
||||
assert result.minute == 30
|
||||
assert result.second == 45
|
||||
assert result.microsecond == 123456
|
||||
|
||||
def test_parse_flexible_date_t_separated_datetime(self):
|
||||
"""Test parse_flexible_date with T-separated datetime (alternative ISO format)"""
|
||||
result = parse_flexible_date('2023-12-25T15:30:45')
|
||||
assert isinstance(result, datetime)
|
||||
assert result.year == 2023
|
||||
assert result.month == 12
|
||||
assert result.day == 25
|
||||
assert result.hour == 15
|
||||
assert result.minute == 30
|
||||
assert result.second == 45
|
||||
|
||||
def test_parse_flexible_date_t_separated_with_microseconds(self):
|
||||
"""Test parse_flexible_date with T-separated datetime and microseconds"""
|
||||
result = parse_flexible_date('2023-12-25T15:30:45.123456')
|
||||
assert isinstance(result, datetime)
|
||||
assert result.year == 2023
|
||||
assert result.microsecond == 123456
|
||||
|
||||
def test_parse_flexible_date_invalid_format(self):
|
||||
"""Test parse_flexible_date with invalid format returns None"""
|
||||
result = parse_flexible_date('invalid-date')
|
||||
|
||||
1249
tests/unit/email_handling/test_send_email.py
Normal file
1249
tests/unit/email_handling/test_send_email.py
Normal file
File diff suppressed because it is too large
Load Diff
538
tests/unit/file_handling/test_file_bom_encoding.py
Normal file
538
tests/unit/file_handling/test_file_bom_encoding.py
Normal file
@@ -0,0 +1,538 @@
|
||||
"""
|
||||
PyTest: file_handling/file_bom_encoding
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
import pytest
|
||||
|
||||
from corelibs.file_handling.file_bom_encoding import (
|
||||
is_bom_encoded,
|
||||
is_bom_encoded_info,
|
||||
BomEncodingInfo,
|
||||
)
|
||||
|
||||
|
||||
class TestIsBomEncoded:
|
||||
"""Test suite for is_bom_encoded function"""
|
||||
|
||||
def test_utf8_bom_file(self, tmp_path: Path):
|
||||
"""Test detection of UTF-8 BOM encoded file"""
|
||||
test_file = tmp_path / "utf8_bom.txt"
|
||||
# UTF-8 BOM: EF BB BF
|
||||
content = b'\xef\xbb\xbfHello, World!'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is True
|
||||
assert isinstance(result, bool)
|
||||
|
||||
def test_utf16_le_bom_file(self, tmp_path: Path):
|
||||
"""Test detection of UTF-16 LE BOM encoded file"""
|
||||
test_file = tmp_path / "utf16_le_bom.txt"
|
||||
# UTF-16 LE BOM: FF FE
|
||||
content = b'\xff\xfeH\x00e\x00l\x00l\x00o\x00'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is True
|
||||
|
||||
def test_utf16_be_bom_file(self, tmp_path: Path):
|
||||
"""Test detection of UTF-16 BE BOM encoded file"""
|
||||
test_file = tmp_path / "utf16_be_bom.txt"
|
||||
# UTF-16 BE BOM: FE FF
|
||||
content = b'\xfe\xff\x00H\x00e\x00l\x00l\x00o'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is True
|
||||
|
||||
def test_utf32_le_bom_file(self, tmp_path: Path):
|
||||
"""Test detection of UTF-32 LE BOM encoded file"""
|
||||
test_file = tmp_path / "utf32_le_bom.txt"
|
||||
# UTF-32 LE BOM: FF FE 00 00
|
||||
content = b'\xff\xfe\x00\x00H\x00\x00\x00e\x00\x00\x00'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is True
|
||||
|
||||
def test_utf32_be_bom_file(self, tmp_path: Path):
|
||||
"""Test detection of UTF-32 BE BOM encoded file"""
|
||||
test_file = tmp_path / "utf32_be_bom.txt"
|
||||
# UTF-32 BE BOM: 00 00 FE FF
|
||||
content = b'\x00\x00\xfe\xff\x00\x00\x00H\x00\x00\x00e'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is True
|
||||
|
||||
def test_no_bom_ascii_file(self, tmp_path: Path):
|
||||
"""Test detection of ASCII file without BOM"""
|
||||
test_file = tmp_path / "ascii.txt"
|
||||
content = b'Hello, World!'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is False
|
||||
|
||||
def test_no_bom_utf8_file(self, tmp_path: Path):
|
||||
"""Test detection of UTF-8 file without BOM"""
|
||||
test_file = tmp_path / "utf8_no_bom.txt"
|
||||
content = 'Hello, 世界!'.encode('utf-8')
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is False
|
||||
|
||||
def test_empty_file(self, tmp_path: Path):
|
||||
"""Test detection on empty file"""
|
||||
test_file = tmp_path / "empty.txt"
|
||||
test_file.write_bytes(b'')
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is False
|
||||
|
||||
def test_binary_file_no_bom(self, tmp_path: Path):
|
||||
"""Test detection on binary file without BOM"""
|
||||
test_file = tmp_path / "binary.bin"
|
||||
content = bytes(range(256))
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is False
|
||||
|
||||
def test_partial_bom_pattern(self, tmp_path: Path):
|
||||
"""Test file with partial BOM pattern that shouldn't match"""
|
||||
test_file = tmp_path / "partial_bom.txt"
|
||||
# Only first two bytes of UTF-8 BOM
|
||||
content = b'\xef\xbbHello'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is False
|
||||
|
||||
def test_false_positive_bom_pattern(self, tmp_path: Path):
|
||||
"""Test file that contains BOM-like bytes but not at the start"""
|
||||
test_file = tmp_path / "false_positive.txt"
|
||||
content = b'Hello\xef\xbb\xbfWorld'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is False
|
||||
|
||||
def test_nonexistent_file(self, tmp_path: Path):
|
||||
"""Test that function raises error for non-existent file"""
|
||||
test_file = tmp_path / "nonexistent.txt"
|
||||
|
||||
with pytest.raises(ValueError, match="Error checking BOM encoding"):
|
||||
is_bom_encoded(test_file)
|
||||
|
||||
def test_very_small_file(self, tmp_path: Path):
|
||||
"""Test file smaller than largest BOM pattern (4 bytes)"""
|
||||
test_file = tmp_path / "small.txt"
|
||||
content = b'Hi'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is False
|
||||
|
||||
def test_exactly_bom_size_utf8(self, tmp_path: Path):
|
||||
"""Test file that is exactly the size of UTF-8 BOM"""
|
||||
test_file = tmp_path / "exact_bom.txt"
|
||||
content = b'\xef\xbb\xbf'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is True
|
||||
|
||||
def test_exactly_bom_size_utf32(self, tmp_path: Path):
|
||||
"""Test file that is exactly the size of UTF-32 BOM"""
|
||||
test_file = tmp_path / "exact_bom_utf32.txt"
|
||||
content = b'\xff\xfe\x00\x00'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded(test_file)
|
||||
assert result is True
|
||||
|
||||
|
||||
class TestIsBomEncodedInfo:
|
||||
"""Test suite for is_bom_encoded_info function"""
|
||||
|
||||
def test_utf8_bom_info(self, tmp_path: Path):
|
||||
"""Test detailed info for UTF-8 BOM encoded file"""
|
||||
test_file = tmp_path / "utf8_bom.txt"
|
||||
content = b'\xef\xbb\xbfHello, UTF-8!'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
|
||||
assert isinstance(result, dict)
|
||||
assert result['has_bom'] is True
|
||||
assert result['bom_type'] == 'UTF-8'
|
||||
assert result['encoding'] == 'utf-8'
|
||||
assert result['bom_length'] == 3
|
||||
assert result['bom_pattern'] == b'\xef\xbb\xbf'
|
||||
|
||||
def test_utf16_le_bom_info(self, tmp_path: Path):
|
||||
"""Test detailed info for UTF-16 LE BOM encoded file"""
|
||||
test_file = tmp_path / "utf16_le_bom.txt"
|
||||
content = b'\xff\xfeH\x00e\x00l\x00l\x00o\x00'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
|
||||
assert result['has_bom'] is True
|
||||
assert result['bom_type'] == 'UTF-16 LE'
|
||||
assert result['encoding'] == 'utf-16-le'
|
||||
assert result['bom_length'] == 2
|
||||
assert result['bom_pattern'] == b'\xff\xfe'
|
||||
|
||||
def test_utf16_be_bom_info(self, tmp_path: Path):
|
||||
"""Test detailed info for UTF-16 BE BOM encoded file"""
|
||||
test_file = tmp_path / "utf16_be_bom.txt"
|
||||
content = b'\xfe\xff\x00H\x00e\x00l\x00l\x00o'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
|
||||
assert result['has_bom'] is True
|
||||
assert result['bom_type'] == 'UTF-16 BE'
|
||||
assert result['encoding'] == 'utf-16-be'
|
||||
assert result['bom_length'] == 2
|
||||
assert result['bom_pattern'] == b'\xfe\xff'
|
||||
|
||||
def test_utf32_le_bom_info(self, tmp_path: Path):
|
||||
"""Test detailed info for UTF-32 LE BOM encoded file"""
|
||||
test_file = tmp_path / "utf32_le_bom.txt"
|
||||
content = b'\xff\xfe\x00\x00H\x00\x00\x00e\x00\x00\x00'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
|
||||
assert result['has_bom'] is True
|
||||
assert result['bom_type'] == 'UTF-32 LE'
|
||||
assert result['encoding'] == 'utf-32-le'
|
||||
assert result['bom_length'] == 4
|
||||
assert result['bom_pattern'] == b'\xff\xfe\x00\x00'
|
||||
|
||||
def test_utf32_be_bom_info(self, tmp_path: Path):
|
||||
"""Test detailed info for UTF-32 BE BOM encoded file"""
|
||||
test_file = tmp_path / "utf32_be_bom.txt"
|
||||
content = b'\x00\x00\xfe\xff\x00\x00\x00H\x00\x00\x00e'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
|
||||
assert result['has_bom'] is True
|
||||
assert result['bom_type'] == 'UTF-32 BE'
|
||||
assert result['encoding'] == 'utf-32-be'
|
||||
assert result['bom_length'] == 4
|
||||
assert result['bom_pattern'] == b'\x00\x00\xfe\xff'
|
||||
|
||||
def test_no_bom_info(self, tmp_path: Path):
|
||||
"""Test detailed info for file without BOM"""
|
||||
test_file = tmp_path / "no_bom.txt"
|
||||
content = b'Hello, World!'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
|
||||
assert result['has_bom'] is False
|
||||
assert result['bom_type'] is None
|
||||
assert result['encoding'] is None
|
||||
assert result['bom_length'] == 0
|
||||
assert result['bom_pattern'] is None
|
||||
|
||||
def test_empty_file_info(self, tmp_path: Path):
|
||||
"""Test detailed info for empty file"""
|
||||
test_file = tmp_path / "empty.txt"
|
||||
test_file.write_bytes(b'')
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
|
||||
assert result['has_bom'] is False
|
||||
assert result['bom_type'] is None
|
||||
assert result['encoding'] is None
|
||||
assert result['bom_length'] == 0
|
||||
assert result['bom_pattern'] is None
|
||||
|
||||
def test_bom_precedence_utf32_vs_utf16(self, tmp_path: Path):
|
||||
"""Test that UTF-32 LE BOM takes precedence over UTF-16 LE when both match"""
|
||||
test_file = tmp_path / "precedence.txt"
|
||||
# UTF-32 LE BOM starts with UTF-16 LE BOM pattern
|
||||
content = b'\xff\xfe\x00\x00Additional content'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
|
||||
# Should detect UTF-32 LE, not UTF-16 LE
|
||||
assert result['has_bom'] is True
|
||||
assert result['bom_type'] == 'UTF-32 LE'
|
||||
assert result['encoding'] == 'utf-32-le'
|
||||
assert result['bom_length'] == 4
|
||||
assert result['bom_pattern'] == b'\xff\xfe\x00\x00'
|
||||
|
||||
def test_return_type_validation(self, tmp_path: Path):
|
||||
"""Test that return type matches BomEncodingInfo TypedDict"""
|
||||
test_file = tmp_path / "test.txt"
|
||||
test_file.write_bytes(b'Test content')
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
|
||||
# Check all required keys are present
|
||||
required_keys = {'has_bom', 'bom_type', 'encoding', 'bom_length', 'bom_pattern'}
|
||||
assert set(result.keys()) == required_keys
|
||||
|
||||
# Check types
|
||||
assert isinstance(result['has_bom'], bool)
|
||||
assert result['bom_type'] is None or isinstance(result['bom_type'], str)
|
||||
assert result['encoding'] is None or isinstance(result['encoding'], str)
|
||||
assert isinstance(result['bom_length'], int)
|
||||
assert result['bom_pattern'] is None or isinstance(result['bom_pattern'], bytes)
|
||||
|
||||
def test_nonexistent_file_error(self, tmp_path: Path):
|
||||
"""Test that function raises ValueError for non-existent file"""
|
||||
test_file = tmp_path / "nonexistent.txt"
|
||||
|
||||
with pytest.raises(ValueError) as exc_info:
|
||||
is_bom_encoded_info(test_file)
|
||||
|
||||
assert "Error checking BOM encoding" in str(exc_info.value)
|
||||
|
||||
def test_directory_instead_of_file(self, tmp_path: Path):
|
||||
"""Test that function raises error when given a directory"""
|
||||
test_dir = tmp_path / "test_dir"
|
||||
test_dir.mkdir()
|
||||
|
||||
with pytest.raises(ValueError, match="Error checking BOM encoding"):
|
||||
is_bom_encoded_info(test_dir)
|
||||
|
||||
def test_large_file_with_bom(self, tmp_path: Path):
|
||||
"""Test BOM detection on large file (only first 4 bytes matter)"""
|
||||
test_file = tmp_path / "large_bom.txt"
|
||||
# UTF-8 BOM followed by large content
|
||||
content = b'\xef\xbb\xbf' + b'A' * 100000
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
|
||||
assert result['has_bom'] is True
|
||||
assert result['bom_type'] == 'UTF-8'
|
||||
assert result['encoding'] == 'utf-8'
|
||||
|
||||
def test_bom_detection_priority_order(self, tmp_path: Path):
|
||||
"""Test that BOM patterns are checked in the correct priority order"""
|
||||
# The function should check longer patterns first to avoid false matches
|
||||
test_cases = [
|
||||
(b'\xff\xfe\x00\x00', 'UTF-32 LE'), # 4 bytes
|
||||
(b'\x00\x00\xfe\xff', 'UTF-32 BE'), # 4 bytes
|
||||
(b'\xff\xfe', 'UTF-16 LE'), # 2 bytes
|
||||
(b'\xfe\xff', 'UTF-16 BE'), # 2 bytes
|
||||
(b'\xef\xbb\xbf', 'UTF-8'), # 3 bytes
|
||||
]
|
||||
|
||||
for i, (bom_bytes, expected_type) in enumerate(test_cases):
|
||||
test_file = tmp_path / f"priority_test_{i}.txt"
|
||||
content = bom_bytes + b'Content'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
assert result['bom_type'] == expected_type
|
||||
assert result['bom_pattern'] == bom_bytes
|
||||
|
||||
def test_csv_file_with_utf8_bom(self, tmp_path: Path):
|
||||
"""Test CSV file with UTF-8 BOM (common use case mentioned in docstring)"""
|
||||
test_file = tmp_path / "data.csv"
|
||||
content = b'\xef\xbb\xbf"Name","Age","City"\n"John",30,"New York"\n"Jane",25,"Tokyo"'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
|
||||
assert result['has_bom'] is True
|
||||
assert result['bom_type'] == 'UTF-8'
|
||||
assert result['encoding'] == 'utf-8'
|
||||
assert result['bom_length'] == 3
|
||||
|
||||
def test_csv_file_without_bom(self, tmp_path: Path):
|
||||
"""Test CSV file without BOM"""
|
||||
test_file = tmp_path / "data_no_bom.csv"
|
||||
content = b'"Name","Age","City"\n"John",30,"New York"\n"Jane",25,"Tokyo"'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
result = is_bom_encoded_info(test_file)
|
||||
|
||||
assert result['has_bom'] is False
|
||||
assert result['bom_type'] is None
|
||||
assert result['encoding'] is None
|
||||
assert result['bom_length'] == 0
|
||||
|
||||
|
||||
class TestBomEncodingInfo:
|
||||
"""Test suite for BomEncodingInfo TypedDict"""
|
||||
|
||||
def test_typed_dict_structure(self):
|
||||
"""Test that BomEncodingInfo has correct structure"""
|
||||
# This is a type check - in actual usage, mypy would validate this
|
||||
sample_info: BomEncodingInfo = {
|
||||
'has_bom': True,
|
||||
'bom_type': 'UTF-8',
|
||||
'encoding': 'utf-8',
|
||||
'bom_length': 3,
|
||||
'bom_pattern': b'\xef\xbb\xbf'
|
||||
}
|
||||
|
||||
assert sample_info['has_bom'] is True
|
||||
assert sample_info['bom_type'] == 'UTF-8'
|
||||
assert sample_info['encoding'] == 'utf-8'
|
||||
assert sample_info['bom_length'] == 3
|
||||
assert sample_info['bom_pattern'] == b'\xef\xbb\xbf'
|
||||
|
||||
def test_typed_dict_none_values(self):
|
||||
"""Test TypedDict with None values"""
|
||||
sample_info: BomEncodingInfo = {
|
||||
'has_bom': False,
|
||||
'bom_type': None,
|
||||
'encoding': None,
|
||||
'bom_length': 0,
|
||||
'bom_pattern': None
|
||||
}
|
||||
|
||||
assert sample_info['has_bom'] is False
|
||||
assert sample_info['bom_type'] is None
|
||||
assert sample_info['encoding'] is None
|
||||
assert sample_info['bom_length'] == 0
|
||||
assert sample_info['bom_pattern'] is None
|
||||
|
||||
|
||||
class TestIntegration:
|
||||
"""Integration tests for BOM encoding detection"""
|
||||
|
||||
def test_is_bom_encoded_uses_info_function(self, tmp_path: Path):
|
||||
"""Test that is_bom_encoded uses is_bom_encoded_info internally"""
|
||||
test_file = tmp_path / "integration.txt"
|
||||
content = b'\xef\xbb\xbfIntegration test'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
# Both functions should return consistent results
|
||||
simple_result = is_bom_encoded(test_file)
|
||||
detailed_result = is_bom_encoded_info(test_file)
|
||||
|
||||
assert simple_result == detailed_result['has_bom']
|
||||
assert simple_result is True
|
||||
|
||||
def test_multiple_file_bom_detection_workflow(self, tmp_path: Path):
|
||||
"""Test a workflow of detecting BOM across multiple files"""
|
||||
files = {
|
||||
'utf8_bom.csv': b'\xef\xbb\xbf"data","value"\n"test",123',
|
||||
'utf16_le.txt': b'\xff\xfeH\x00e\x00l\x00l\x00o\x00',
|
||||
'no_bom.txt': b'Plain ASCII text',
|
||||
'empty.txt': b'',
|
||||
}
|
||||
|
||||
results = {}
|
||||
detailed_results = {}
|
||||
|
||||
for filename, content in files.items():
|
||||
file_path = tmp_path / filename
|
||||
file_path.write_bytes(content)
|
||||
|
||||
results[filename] = is_bom_encoded(file_path)
|
||||
detailed_results[filename] = is_bom_encoded_info(file_path)
|
||||
|
||||
# Verify results
|
||||
assert results['utf8_bom.csv'] is True
|
||||
assert results['utf16_le.txt'] is True
|
||||
assert results['no_bom.txt'] is False
|
||||
assert results['empty.txt'] is False
|
||||
|
||||
# Verify detailed results match simple results
|
||||
for filename in files:
|
||||
assert results[filename] == detailed_results[filename]['has_bom']
|
||||
|
||||
# Verify specific encoding details
|
||||
assert detailed_results['utf8_bom.csv']['encoding'] == 'utf-8'
|
||||
assert detailed_results['utf16_le.txt']['encoding'] == 'utf-16-le'
|
||||
assert detailed_results['no_bom.txt']['encoding'] is None
|
||||
|
||||
def test_csv_loading_workflow(self, tmp_path: Path):
|
||||
"""Test BOM detection workflow for CSV loading (main use case)"""
|
||||
# Create CSV files with and without BOM
|
||||
csv_with_bom = tmp_path / "data_with_bom.csv"
|
||||
csv_without_bom = tmp_path / "data_without_bom.csv"
|
||||
|
||||
# CSV with UTF-8 BOM
|
||||
bom_content = b'\xef\xbb\xbf"Name","Age"\n"Alice",30\n"Bob",25'
|
||||
csv_with_bom.write_bytes(bom_content)
|
||||
|
||||
# CSV without BOM
|
||||
no_bom_content = b'"Name","Age"\n"Charlie",35\n"Diana",28'
|
||||
csv_without_bom.write_bytes(no_bom_content)
|
||||
|
||||
# Simulate CSV loading workflow
|
||||
files_to_process = [csv_with_bom, csv_without_bom]
|
||||
processing_info: list[dict[str, str | bool | int]] = []
|
||||
|
||||
for csv_file in files_to_process:
|
||||
bom_info = is_bom_encoded_info(csv_file)
|
||||
|
||||
file_info: dict[str, str | bool | int] = {
|
||||
'file': csv_file.name,
|
||||
'has_bom': bom_info['has_bom'],
|
||||
'encoding': bom_info['encoding'] or 'default',
|
||||
'skip_bytes': bom_info['bom_length']
|
||||
}
|
||||
processing_info.append(file_info)
|
||||
|
||||
# Verify workflow results
|
||||
assert len(processing_info) == 2
|
||||
|
||||
bom_file_info = next(info for info in processing_info if info['file'] == 'data_with_bom.csv')
|
||||
no_bom_file_info = next(info for info in processing_info if info['file'] == 'data_without_bom.csv')
|
||||
|
||||
assert bom_file_info['has_bom'] is True
|
||||
assert bom_file_info['encoding'] == 'utf-8'
|
||||
assert bom_file_info['skip_bytes'] == 3
|
||||
|
||||
assert no_bom_file_info['has_bom'] is False
|
||||
assert no_bom_file_info['encoding'] == 'default'
|
||||
assert no_bom_file_info['skip_bytes'] == 0
|
||||
|
||||
def test_error_handling_consistency(self, tmp_path: Path):
|
||||
"""Test that both functions handle errors consistently"""
|
||||
nonexistent_file = tmp_path / "does_not_exist.txt"
|
||||
|
||||
# Both functions should raise ValueError for non-existent files
|
||||
with pytest.raises(ValueError):
|
||||
is_bom_encoded(nonexistent_file)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
is_bom_encoded_info(nonexistent_file)
|
||||
|
||||
def test_all_supported_bom_types(self, tmp_path: Path):
|
||||
"""Test detection of all supported BOM types"""
|
||||
bom_test_cases = [
|
||||
('utf8', b'\xef\xbb\xbf', 'UTF-8', 'utf-8', 3),
|
||||
('utf16_le', b'\xff\xfe', 'UTF-16 LE', 'utf-16-le', 2),
|
||||
('utf16_be', b'\xfe\xff', 'UTF-16 BE', 'utf-16-be', 2),
|
||||
('utf32_le', b'\xff\xfe\x00\x00', 'UTF-32 LE', 'utf-32-le', 4),
|
||||
('utf32_be', b'\x00\x00\xfe\xff', 'UTF-32 BE', 'utf-32-be', 4),
|
||||
]
|
||||
|
||||
for name, bom_bytes, expected_type, expected_encoding, expected_length in bom_test_cases:
|
||||
test_file = tmp_path / f"{name}_test.txt"
|
||||
content = bom_bytes + b'Test content'
|
||||
test_file.write_bytes(content)
|
||||
|
||||
# Test simple function
|
||||
assert is_bom_encoded(test_file) is True
|
||||
|
||||
# Test detailed function
|
||||
info = is_bom_encoded_info(test_file)
|
||||
assert info['has_bom'] is True
|
||||
assert info['bom_type'] == expected_type
|
||||
assert info['encoding'] == expected_encoding
|
||||
assert info['bom_length'] == expected_length
|
||||
assert info['bom_pattern'] == bom_bytes
|
||||
|
||||
|
||||
# __END__
|
||||
Reference in New Issue
Block a user