Compare commits
32 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6322b95068 | ||
|
|
715ed1f9c2 | ||
|
|
82a759dd21 | ||
|
|
fe913608c4 | ||
|
|
79f9c5d1c6 | ||
|
|
3d091129e2 | ||
|
|
1a978f786d | ||
|
|
51669d3c5f | ||
|
|
d128dcb479 | ||
|
|
84286593f6 | ||
|
|
8d97f09e5e | ||
|
|
2748bc19be | ||
|
|
0b3c8fc774 | ||
|
|
7da18e0f00 | ||
|
|
49e38081ad | ||
|
|
a14f993a31 | ||
|
|
ae938f9909 | ||
|
|
f91e0bb93a | ||
|
|
d3f61005cf | ||
|
|
2923a3e88b | ||
|
|
a73ced0067 | ||
|
|
f89b91fe7f | ||
|
|
5950485d46 | ||
|
|
f349927a63 | ||
|
|
dfe8890598 | ||
|
|
d224876a8e | ||
|
|
17e8c76b94 | ||
|
|
9034a31cd6 | ||
|
|
523e61c9f7 | ||
|
|
cf575ded90 | ||
|
|
11a75d8532 | ||
|
|
6593e11332 |
@@ -1,7 +1,7 @@
|
||||
# MARK: Project info
|
||||
[project]
|
||||
name = "corelibs"
|
||||
version = "0.42.2"
|
||||
version = "0.47.0"
|
||||
description = "Collection of utils for Python scripts"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.13"
|
||||
|
||||
@@ -19,9 +19,26 @@ def compile_re(reg: str) -> re.Pattern[str]:
|
||||
|
||||
|
||||
# email regex
|
||||
EMAIL_BASIC_REGEX: str = r"""
|
||||
^[A-Za-z0-9!#$%&'*+\-\/=?^_`{|}~][A-Za-z0-9!#$%:\(\)&'*+\-\/=?^_`{|}~\.]{0,63}
|
||||
@(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[a-zA-Z]{2,6}$
|
||||
SUB_EMAIL_BASIC_REGEX: str = r"""
|
||||
[A-Za-z0-9!#$%&'*+\-\/=?^_`{|}~][A-Za-z0-9!#$%:\(\)&'*+\-\/=?^_`{|}~\.]{0,63}
|
||||
@(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[a-zA-Z]{2,6}
|
||||
"""
|
||||
EMAIL_BASIC_REGEX = rf"^{SUB_EMAIL_BASIC_REGEX}$"
|
||||
# name + email regex for email sending type like "foo bar" <email@mail.com>
|
||||
NAME_EMAIL_SIMPLE_REGEX = r"""
|
||||
^\s*(?:"(?P<name1>[^"]+)"\s*<(?P<email1>[^>]+)>|
|
||||
(?P<name2>.+?)\s*<(?P<email2>[^>]+)>|
|
||||
<(?P<email3>[^>]+)>|
|
||||
(?P<email4>[^\s<>]+))\s*$
|
||||
"""
|
||||
# name + email with the basic regex set
|
||||
NAME_EMAIL_BASIC_REGEX = rf"""
|
||||
^\s*(?:
|
||||
"(?P<name1>[^"]+)"\s*<(?P<email1>{SUB_EMAIL_BASIC_REGEX})>|
|
||||
(?P<name2>.+?)\s*<(?P<email2>{SUB_EMAIL_BASIC_REGEX})>|
|
||||
<(?P<email3>{SUB_EMAIL_BASIC_REGEX})>|
|
||||
(?P<email4>{SUB_EMAIL_BASIC_REGEX})
|
||||
)\s*$
|
||||
"""
|
||||
# Domain regex with localhost
|
||||
DOMAIN_WITH_LOCALHOST_REGEX: str = r"""
|
||||
|
||||
23
src/corelibs/check_handling/regex_constants_compiled.py
Normal file
23
src/corelibs/check_handling/regex_constants_compiled.py
Normal file
@@ -0,0 +1,23 @@
|
||||
"""
|
||||
List of regex compiled strings that can be used
|
||||
"""
|
||||
|
||||
from corelibs.check_handling.regex_constants import (
|
||||
compile_re,
|
||||
EMAIL_BASIC_REGEX,
|
||||
NAME_EMAIL_SIMPLE_REGEX,
|
||||
NAME_EMAIL_BASIC_REGEX,
|
||||
DOMAIN_WITH_LOCALHOST_REGEX,
|
||||
DOMAIN_WITH_LOCALHOST_PORT_REGEX,
|
||||
DOMAIN_REGEX
|
||||
)
|
||||
|
||||
# all above in compiled form
|
||||
COMPILED_EMAIL_BASIC_REGEX = compile_re(EMAIL_BASIC_REGEX)
|
||||
COMPILED_NAME_EMAIL_SIMPLE_REGEX = compile_re(NAME_EMAIL_SIMPLE_REGEX)
|
||||
COMPILED_NAME_EMAIL_BASIC_REGEX = compile_re(NAME_EMAIL_BASIC_REGEX)
|
||||
COMPILED_DOMAIN_WITH_LOCALHOST_REGEX = compile_re(DOMAIN_WITH_LOCALHOST_REGEX)
|
||||
COMPILED_DOMAIN_WITH_LOCALHOST_PORT_REGEX = compile_re(DOMAIN_WITH_LOCALHOST_PORT_REGEX)
|
||||
COMPILED_DOMAIN_REGEX = compile_re(DOMAIN_REGEX)
|
||||
|
||||
# __END__
|
||||
@@ -53,6 +53,9 @@ class SettingsLoader:
|
||||
# for check settings, abort flag
|
||||
self.__check_settings_abort: bool = False
|
||||
|
||||
# error messages for raise ValueError
|
||||
self.__error_msg: list[str] = []
|
||||
|
||||
# MARK: load settings
|
||||
def load_settings(
|
||||
self,
|
||||
@@ -87,6 +90,8 @@ class SettingsLoader:
|
||||
Returns:
|
||||
dict[str, str]: key = value list
|
||||
"""
|
||||
# reset error message list before run
|
||||
self.__error_msg = []
|
||||
# default set entries
|
||||
entry_set_empty: dict[str, str | None] = {}
|
||||
# entries that have to be split
|
||||
@@ -109,7 +114,7 @@ class SettingsLoader:
|
||||
if allow_not_exist is True:
|
||||
return {}
|
||||
raise ValueError(self.__print(
|
||||
f"[!] Cannot read [{config_id}] block in the {self.config_file}: {e}",
|
||||
f"[!] Cannot read [{config_id}] block in the file {self.config_file}: {e}",
|
||||
'CRITICAL'
|
||||
)) from e
|
||||
try:
|
||||
@@ -168,10 +173,13 @@ class SettingsLoader:
|
||||
args_overrride.append(key)
|
||||
if skip:
|
||||
continue
|
||||
settings[config_id][key] = [
|
||||
__value.replace(" ", "")
|
||||
for __value in settings[config_id][key].split(split_char)
|
||||
]
|
||||
if settings[config_id][key]:
|
||||
settings[config_id][key] = [
|
||||
__value.replace(" ", "")
|
||||
for __value in settings[config_id][key].split(split_char)
|
||||
]
|
||||
else:
|
||||
settings[config_id][key] = []
|
||||
except KeyError as e:
|
||||
raise ValueError(self.__print(
|
||||
f"[!] Cannot read [{config_id}] block because the entry [{e}] could not be found",
|
||||
@@ -181,9 +189,8 @@ class SettingsLoader:
|
||||
# ignore error if arguments are set
|
||||
if not self.__check_arguments(config_validate, True):
|
||||
raise ValueError(self.__print(f"[!] Cannot find file: {self.config_file}", 'CRITICAL'))
|
||||
else:
|
||||
# base set
|
||||
settings[config_id] = {}
|
||||
# base set
|
||||
settings[config_id] = {}
|
||||
# make sure all are set
|
||||
# if we have arguments set, this override config settings
|
||||
error: bool = False
|
||||
@@ -274,7 +281,10 @@ class SettingsLoader:
|
||||
error = True
|
||||
self.__print(f"[!] Missing content entry for: {entry}", 'ERROR')
|
||||
if error is True:
|
||||
raise ValueError(self.__print("[!] Missing or incorrect settings data. Cannot proceed", 'CRITICAL'))
|
||||
self.__print("[!] Missing or incorrect settings data. Cannot proceed", 'CRITICAL')
|
||||
raise ValueError(
|
||||
"Missing or incorrect settings data. Cannot proceed: " + "; ".join(self.__error_msg)
|
||||
)
|
||||
# set empty
|
||||
for [entry, empty_set] in entry_set_empty.items():
|
||||
# if set, skip, else set to empty value
|
||||
@@ -567,7 +577,10 @@ class SettingsLoader:
|
||||
self.log.logger.log(Log.get_log_level_int(level), msg, stacklevel=2)
|
||||
if self.log is None or self.always_print:
|
||||
if print_error:
|
||||
print(msg)
|
||||
print(f"[SettingsLoader] {msg}")
|
||||
if level == 'ERROR':
|
||||
# remove any prefix [!] for error message list
|
||||
self.__error_msg.append(msg.replace('[!] ', '').strip())
|
||||
return msg
|
||||
|
||||
|
||||
|
||||
@@ -7,10 +7,13 @@ from typing import Any, Sequence
|
||||
from pathlib import Path
|
||||
from collections import Counter
|
||||
import csv
|
||||
from corelibs.file_handling.file_bom_encoding import is_bom_encoded, is_bom_encoded_info
|
||||
from corelibs.exceptions.csv_exceptions import (
|
||||
NoCsvReader, CompulsoryCsvHeaderCheckFailed, CsvHeaderDataMissing
|
||||
)
|
||||
|
||||
ENCODING = 'utf-8'
|
||||
ENCODING_UTF8_SIG = 'utf-8-sig'
|
||||
DELIMITER = ","
|
||||
QUOTECHAR = '"'
|
||||
# type: _QuotingType
|
||||
@@ -27,6 +30,7 @@ class CsvWriter:
|
||||
file_name: Path,
|
||||
header_mapping: dict[str, str],
|
||||
header_order: list[str] | None = None,
|
||||
encoding: str = ENCODING,
|
||||
delimiter: str = DELIMITER,
|
||||
quotechar: str = QUOTECHAR,
|
||||
quoting: Any = QUOTING,
|
||||
@@ -38,6 +42,7 @@ class CsvWriter:
|
||||
self.__delimiter = delimiter
|
||||
self.__quotechar = quotechar
|
||||
self.__quoting = quoting
|
||||
self.__encoding = encoding
|
||||
self.csv_file_writer = self.__open_csv(header_order)
|
||||
|
||||
def __open_csv(self, header_order: list[str] | None) -> csv.DictWriter[str]:
|
||||
@@ -69,7 +74,8 @@ class CsvWriter:
|
||||
try:
|
||||
fp = open(
|
||||
self.__file_name,
|
||||
"w", encoding="utf-8"
|
||||
"w",
|
||||
encoding=self.__encoding
|
||||
)
|
||||
csv_file_writer = csv.DictWriter(
|
||||
fp,
|
||||
@@ -109,6 +115,7 @@ class CsvReader:
|
||||
self,
|
||||
file_name: Path,
|
||||
header_check: Sequence[str] | None = None,
|
||||
encoding: str = ENCODING,
|
||||
delimiter: str = DELIMITER,
|
||||
quotechar: str = QUOTECHAR,
|
||||
quoting: Any = QUOTING,
|
||||
@@ -118,6 +125,7 @@ class CsvReader:
|
||||
self.__delimiter = delimiter
|
||||
self.__quotechar = quotechar
|
||||
self.__quoting = quoting
|
||||
self.__encoding = encoding
|
||||
self.header: Sequence[str] | None = None
|
||||
self.csv_file_reader = self.__open_csv()
|
||||
|
||||
@@ -129,9 +137,16 @@ class CsvReader:
|
||||
csv.DictReader | None: _description_
|
||||
"""
|
||||
try:
|
||||
# if UTF style check if this is BOM
|
||||
if self.__encoding.lower().startswith('utf-') and is_bom_encoded(self.__file_name):
|
||||
bom_info = is_bom_encoded_info(self.__file_name)
|
||||
if bom_info['encoding'] == 'utf-8':
|
||||
self.__encoding = ENCODING_UTF8_SIG
|
||||
else:
|
||||
self.__encoding = bom_info['encoding'] or self.__encoding
|
||||
fp = open(
|
||||
self.__file_name,
|
||||
"r", encoding="utf-8"
|
||||
"r", encoding=self.__encoding
|
||||
)
|
||||
csv_file_reader = csv.DictReader(
|
||||
fp,
|
||||
|
||||
76
src/corelibs/db_handling/sql_main.py
Normal file
76
src/corelibs/db_handling/sql_main.py
Normal file
@@ -0,0 +1,76 @@
|
||||
"""
|
||||
Main SQL base for any SQL calls
|
||||
This is a wrapper for SQLiteIO or other future DB Interfaces
|
||||
[Note: at the moment only SQLiteIO is implemented]
|
||||
- on class creation connection with ValueError on fail
|
||||
- connect method checks if already connected and warns
|
||||
- connection class fails with ValueError if not valid target is selected (SQL wrapper type)
|
||||
- connected check class method
|
||||
- a process class that returns data as list or False if end or error
|
||||
|
||||
TODO: adapt more CoreLibs DB IO class flow here
|
||||
"""
|
||||
|
||||
from typing import TYPE_CHECKING, Any, Literal
|
||||
from corelibs.debug_handling.debug_helpers import call_stack
|
||||
from corelibs.db_handling.sqlite_io import SQLiteIO
|
||||
if TYPE_CHECKING:
|
||||
from corelibs.logging_handling.log import Logger
|
||||
|
||||
|
||||
IDENT_SPLIT_CHARACTER: str = ':'
|
||||
|
||||
|
||||
class SQLMain:
|
||||
"""Main SQL interface class"""
|
||||
def __init__(self, log: 'Logger', db_ident: str):
|
||||
self.log = log
|
||||
self.dbh: SQLiteIO | None = None
|
||||
self.db_target: str | None = None
|
||||
self.connect(db_ident)
|
||||
if not self.connected():
|
||||
raise ValueError(f'Failed to connect to database [{call_stack()}]')
|
||||
|
||||
def connect(self, db_ident: str):
|
||||
"""setup basic connection"""
|
||||
if self.dbh is not None and self.dbh.conn is not None:
|
||||
self.log.warning(f"A database connection already exists for: {self.db_target} [{call_stack()}]")
|
||||
return
|
||||
self.db_target, db_dsn = db_ident.split(IDENT_SPLIT_CHARACTER)
|
||||
match self.db_target:
|
||||
case 'sqlite':
|
||||
# this is a Path only at the moment
|
||||
self.dbh = SQLiteIO(self.log, db_dsn, row_factory='Dict')
|
||||
case _:
|
||||
raise ValueError(f'SQL interface for {self.db_target} is not implemented [{call_stack()}]')
|
||||
if not self.dbh.db_connected():
|
||||
raise ValueError(f"DB Connection failed for: {self.db_target} [{call_stack()}]")
|
||||
|
||||
def close(self):
|
||||
"""close connection"""
|
||||
if self.dbh is None or not self.connected():
|
||||
return
|
||||
# self.log.info(f"Close DB Connection: {self.db_target} [{call_stack()}]")
|
||||
self.dbh.db_close()
|
||||
|
||||
def connected(self) -> bool:
|
||||
"""check connectuon"""
|
||||
if self.dbh is None or not self.dbh.db_connected():
|
||||
self.log.warning(f"No connection [{call_stack()}]")
|
||||
return False
|
||||
return True
|
||||
|
||||
def process_query(
|
||||
self, query: str, params: tuple[Any, ...] | None = None
|
||||
) -> list[tuple[Any, ...]] | list[dict[str, Any]] | Literal[False]:
|
||||
"""mini wrapper for execute query"""
|
||||
if self.dbh is not None:
|
||||
result = self.dbh.execute_query(query, params)
|
||||
if result is False:
|
||||
return False
|
||||
else:
|
||||
self.log.error(f"Problem connecting to db: {self.db_target} [{call_stack()}]")
|
||||
return False
|
||||
return result
|
||||
|
||||
# __END__
|
||||
@@ -4,6 +4,8 @@ Send email wrapper
|
||||
|
||||
import smtplib
|
||||
from email.message import EmailMessage
|
||||
from email.header import Header
|
||||
from email.utils import formataddr, parseaddr
|
||||
from typing import TYPE_CHECKING, Any
|
||||
if TYPE_CHECKING:
|
||||
from corelibs.logging_handling.log import Logger
|
||||
@@ -133,21 +135,30 @@ class SendEmail:
|
||||
_subject = template["subject"]
|
||||
_body = template["body"]
|
||||
for key, value in replace.items():
|
||||
_subject = _subject.replace(f"{{{{{key}}}}}", value)
|
||||
_body = _body.replace(f"{{{{{key}}}}}", value)
|
||||
placeholder = f"{{{{{key}}}}}"
|
||||
_subject = _subject.replace(placeholder, value)
|
||||
_body = _body.replace(placeholder, value)
|
||||
name, addr = parseaddr(from_email)
|
||||
if name:
|
||||
# Encode the name part with MIME encoding
|
||||
encoded_name = str(Header(name, 'utf-8'))
|
||||
from_email_encoded = formataddr((encoded_name, addr))
|
||||
else:
|
||||
from_email_encoded = from_email
|
||||
# create a simple email and add subhect, from email
|
||||
msg_email = EmailMessage()
|
||||
# msg.set_content(_body, charset='utf-8', cte='quoted-printable')
|
||||
msg_email.set_content(_body, charset="utf-8")
|
||||
msg_email["Subject"] = _subject
|
||||
msg_email["From"] = from_email
|
||||
msg_email["From"] = from_email_encoded
|
||||
# push to array for sening
|
||||
msg.append(msg_email)
|
||||
return msg
|
||||
|
||||
def send_email_list(
|
||||
self,
|
||||
email: list[EmailMessage], receivers: list[str],
|
||||
emails: list[EmailMessage],
|
||||
receivers: list[str],
|
||||
combined_send: bool | None = None,
|
||||
test_only: bool | None = None
|
||||
):
|
||||
@@ -170,18 +181,27 @@ class SendEmail:
|
||||
smtp = smtplib.SMTP(smtp_host)
|
||||
except ConnectionRefusedError as e:
|
||||
self.log.error("Could not open SMTP connection to: %s, %s", smtp_host, e)
|
||||
# prepare receiver list
|
||||
receivers_encoded: list[str] = []
|
||||
for __receiver in receivers:
|
||||
to_name, to_addr = parseaddr(__receiver)
|
||||
if to_name:
|
||||
# Encode the name part with MIME encoding
|
||||
encoded_to_name = str(Header(to_name, 'utf-8'))
|
||||
receivers_encoded.append(formataddr((encoded_to_name, to_addr)))
|
||||
else:
|
||||
receivers_encoded.append(__receiver)
|
||||
# loop over messages and then over recievers
|
||||
for msg in email:
|
||||
for msg in emails:
|
||||
if combined_send is True:
|
||||
msg["To"] = ", ".join(receivers)
|
||||
msg["To"] = ", ".join(receivers_encoded)
|
||||
if not self.settings.get('test'):
|
||||
if smtp is not None:
|
||||
smtp.send_message(msg, msg["From"], receivers)
|
||||
smtp.send_message(msg, msg["From"], receivers_encoded)
|
||||
else:
|
||||
self.log.info(f"[EMAIL] Test, not sending email\n{msg}")
|
||||
else:
|
||||
for receiver in receivers:
|
||||
# send to
|
||||
for receiver in receivers_encoded:
|
||||
self.log.debug(f"===> Send to: {receiver}")
|
||||
if "To" in msg:
|
||||
msg.replace_header("To", receiver)
|
||||
|
||||
@@ -4,11 +4,38 @@ Various dictionary, object and list hashers
|
||||
|
||||
import json
|
||||
import hashlib
|
||||
from typing import Any
|
||||
from typing import Any, cast, Sequence
|
||||
|
||||
|
||||
def hash_object(obj: Any) -> str:
|
||||
"""
|
||||
RECOMMENDED for new use
|
||||
Create a hash for any dict or list with mixed key types
|
||||
|
||||
Arguments:
|
||||
obj {Any} -- _description_
|
||||
|
||||
Returns:
|
||||
str -- _description_
|
||||
"""
|
||||
def normalize(o: Any) -> Any:
|
||||
if isinstance(o, dict):
|
||||
# Sort by repr of keys to handle mixed types (str, int, etc.)
|
||||
o = cast(dict[Any, Any], o)
|
||||
return tuple(sorted((repr(k), normalize(v)) for k, v in o.items()))
|
||||
if isinstance(o, (list, tuple)):
|
||||
o = cast(Sequence[Any], o)
|
||||
return tuple(normalize(item) for item in o)
|
||||
return repr(o)
|
||||
|
||||
normalized = normalize(obj)
|
||||
return hashlib.sha256(str(normalized).encode()).hexdigest()
|
||||
|
||||
|
||||
def dict_hash_frozen(data: dict[Any, Any]) -> int:
|
||||
"""
|
||||
NOT RECOMMENDED, use dict_hash_crc or hash_object instead
|
||||
If used, DO NOT CHANGE
|
||||
hash a dict via freeze
|
||||
|
||||
Args:
|
||||
@@ -22,18 +49,25 @@ def dict_hash_frozen(data: dict[Any, Any]) -> int:
|
||||
|
||||
def dict_hash_crc(data: dict[Any, Any] | list[Any]) -> str:
|
||||
"""
|
||||
Create a sha256 hash over dict
|
||||
LEGACY METHOD, must be kept for fallback, if used by other code, DO NOT CHANGE
|
||||
Create a sha256 hash over dict or list
|
||||
alternative for
|
||||
dict_hash_frozen
|
||||
|
||||
Args:
|
||||
data (dict | list): _description_
|
||||
data (dict[Any, Any] | list[Any]): _description_
|
||||
|
||||
Returns:
|
||||
str: _description_
|
||||
str: sha256 hash, prefiex with HO_ if fallback used
|
||||
"""
|
||||
return hashlib.sha256(
|
||||
json.dumps(data, sort_keys=True, ensure_ascii=True).encode('utf-8')
|
||||
).hexdigest()
|
||||
try:
|
||||
return hashlib.sha256(
|
||||
# IT IS IMPORTANT THAT THE BELOW CALL STAYS THE SAME AND DOES NOT CHANGE OR WE WILL GET DIFFERENT HASHES
|
||||
# separators=(',', ':') to get rid of spaces, but if this is used the hash will be different, DO NOT ADD
|
||||
json.dumps(data, sort_keys=True, ensure_ascii=True, default=str).encode('utf-8')
|
||||
).hexdigest()
|
||||
except TypeError:
|
||||
# Fallback tod different hasher, will return DIFFERENT hash than above, so only usable in int/str key mixes
|
||||
return "HO_" + hash_object(data)
|
||||
|
||||
# __END__
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
List type helpers
|
||||
"""
|
||||
|
||||
import json
|
||||
from typing import Any, Sequence
|
||||
|
||||
|
||||
@@ -44,4 +45,31 @@ def is_list_in_list(
|
||||
# Get the difference and extract just the values
|
||||
return [item for item, _ in set_a - set_b]
|
||||
|
||||
|
||||
def make_unique_list_of_dicts(dict_list: list[Any]) -> list[Any]:
|
||||
"""
|
||||
Create a list of unique dictionary entries
|
||||
|
||||
Arguments:
|
||||
dict_list {list[Any]} -- _description_
|
||||
|
||||
Returns:
|
||||
list[Any] -- _description_
|
||||
"""
|
||||
try:
|
||||
# try json dumps, can fail with int and str index types
|
||||
return list(
|
||||
{
|
||||
json.dumps(d, sort_keys=True, ensure_ascii=True, separators=(',', ':')): d
|
||||
for d in dict_list
|
||||
}.values()
|
||||
)
|
||||
except TypeError:
|
||||
# Fallback for non-serializable entries, slow but works
|
||||
unique: list[Any] = []
|
||||
for d in dict_list:
|
||||
if d not in unique:
|
||||
unique.append(d)
|
||||
return unique
|
||||
|
||||
# __END__
|
||||
|
||||
@@ -602,9 +602,9 @@ class Log(LogParent):
|
||||
__setting = self.DEFAULT_LOG_SETTINGS.get(__log_entry, True)
|
||||
default_log_settings[__log_entry] = __setting
|
||||
# check console log type
|
||||
default_log_settings['console_format_type'] = cast('ConsoleFormat', log_settings.get(
|
||||
'console_format_type', self.DEFAULT_LOG_SETTINGS['console_format_type']
|
||||
))
|
||||
if (console_format_type := log_settings.get('console_format_type')) is None:
|
||||
console_format_type = self.DEFAULT_LOG_SETTINGS['console_format_type']
|
||||
default_log_settings['console_format_type'] = cast('ConsoleFormat', console_format_type)
|
||||
# check log queue
|
||||
__setting = log_settings.get('log_queue', self.DEFAULT_LOG_SETTINGS['log_queue'])
|
||||
if __setting is not None:
|
||||
@@ -774,6 +774,16 @@ class Log(LogParent):
|
||||
self.__set_console_formatter(console_format_type)
|
||||
)
|
||||
|
||||
def get_console_formatter(self) -> ConsoleFormat:
|
||||
"""
|
||||
Get the current console formatter, this the settings type
|
||||
Note that if eg "ALL" is set it will return the combined information but not the ALL flag name itself
|
||||
|
||||
Returns:
|
||||
ConsoleFormat -- _description_
|
||||
"""
|
||||
return self.log_settings['console_format_type']
|
||||
|
||||
# MARK: console handler
|
||||
def __create_console_handler(
|
||||
self, handler_name: str,
|
||||
|
||||
0
src/corelibs/math_handling/__init__.py
Normal file
0
src/corelibs/math_handling/__init__.py
Normal file
35
src/corelibs/math_handling/math_helpers.py
Normal file
35
src/corelibs/math_handling/math_helpers.py
Normal file
@@ -0,0 +1,35 @@
|
||||
"""
|
||||
Various math helpers
|
||||
"""
|
||||
|
||||
import math
|
||||
|
||||
|
||||
def gcd(a: int, b: int):
|
||||
"""
|
||||
Calculate: Greatest Common Divisor
|
||||
|
||||
Arguments:
|
||||
a {int} -- _description_
|
||||
b {int} -- _description_
|
||||
|
||||
Returns:
|
||||
_type_ -- _description_
|
||||
"""
|
||||
return math.gcd(a, b)
|
||||
|
||||
|
||||
def lcd(a: int, b: int):
|
||||
"""
|
||||
Calculate: Least Common Denominator
|
||||
|
||||
Arguments:
|
||||
a {int} -- _description_
|
||||
b {int} -- _description_
|
||||
|
||||
Returns:
|
||||
_type_ -- _description_
|
||||
"""
|
||||
return math.lcm(a, b)
|
||||
|
||||
# __END__
|
||||
@@ -12,7 +12,7 @@ class EnumBase(CorelibsEnumBase):
|
||||
|
||||
.. deprecated::
|
||||
Use corelibs_enum_base.EnumBase instead
|
||||
DEPRECATED: Use corelibs_enum_base.EnumBase instead
|
||||
DEPRECATED: Use corelibs_enum_base.enum_base.EnumBase instead
|
||||
|
||||
lookup_any and from_any will return "EnumBase" and the sub class name
|
||||
run the return again to "from_any" to get a clean value, or cast it
|
||||
@@ -20,6 +20,6 @@ class EnumBase(CorelibsEnumBase):
|
||||
|
||||
|
||||
# At the module level, issue a deprecation warning
|
||||
warnings.warn("Use corelibs_enum_base.EnumBase instead", DeprecationWarning, stacklevel=2)
|
||||
warnings.warn("Use corelibs_enum_base.enum_base.EnumBase instead", DeprecationWarning, stacklevel=2)
|
||||
|
||||
# __EMD__
|
||||
|
||||
@@ -6,7 +6,7 @@ from typing_extensions import deprecated
|
||||
from corelibs_enum_base.enum_base import EnumBase as CorelibsEnumBase
|
||||
|
||||
|
||||
@deprecated("Use corelibs_enum_base.EnumBase instead")
|
||||
@deprecated("Use corelibs_enum_base.enum_base.EnumBase instead")
|
||||
class EnumBase(CorelibsEnumBase):
|
||||
"""
|
||||
base for enum
|
||||
|
||||
109
test-run/check_handling/regex_checks.py
Normal file
109
test-run/check_handling/regex_checks.py
Normal file
@@ -0,0 +1,109 @@
|
||||
"""
|
||||
Test check andling for regex checks
|
||||
"""
|
||||
|
||||
from corelibs_text_colors.text_colors import Colors
|
||||
from corelibs.check_handling.regex_constants import (
|
||||
compile_re, DOMAIN_WITH_LOCALHOST_REGEX, EMAIL_BASIC_REGEX, NAME_EMAIL_BASIC_REGEX, SUB_EMAIL_BASIC_REGEX
|
||||
)
|
||||
from corelibs.check_handling.regex_constants_compiled import (
|
||||
COMPILED_DOMAIN_WITH_LOCALHOST_REGEX, COMPILED_EMAIL_BASIC_REGEX,
|
||||
COMPILED_NAME_EMAIL_SIMPLE_REGEX, COMPILED_NAME_EMAIL_BASIC_REGEX
|
||||
)
|
||||
|
||||
NAME_EMAIL_SIMPLE_REGEX = r"""
|
||||
^\s*(?:"(?P<name1>[^"]+)"\s*<(?P<email1>[^>]+)>|
|
||||
(?P<name2>.+?)\s*<(?P<email2>[^>]+)>|
|
||||
<(?P<email3>[^>]+)>|
|
||||
(?P<email4>[^\s<>]+))\s*$
|
||||
"""
|
||||
|
||||
|
||||
def domain_test():
|
||||
"""
|
||||
domain regex test
|
||||
"""
|
||||
print("=" * 30)
|
||||
test_domains = [
|
||||
"example.com",
|
||||
"localhost",
|
||||
"subdomain.localhost",
|
||||
"test.localhost.com",
|
||||
"some-domain.org"
|
||||
]
|
||||
|
||||
regex_domain_check = COMPILED_DOMAIN_WITH_LOCALHOST_REGEX
|
||||
print(f"REGEX: {DOMAIN_WITH_LOCALHOST_REGEX}")
|
||||
print(f"Check regex: {regex_domain_check.search('localhost')}")
|
||||
|
||||
for domain in test_domains:
|
||||
if regex_domain_check.search(domain):
|
||||
print(f"Matched: {domain}")
|
||||
else:
|
||||
print(f"Did not match: {domain}")
|
||||
|
||||
|
||||
def email_test():
|
||||
"""
|
||||
email regex test
|
||||
"""
|
||||
print("=" * 30)
|
||||
email_list = """
|
||||
e@bar.com
|
||||
<f@foobar.com>
|
||||
"Master" <foobar@bar.com>
|
||||
"not valid" not@valid.com
|
||||
also not valid not@valid.com
|
||||
some header <something@bar.com>
|
||||
test master <master@master.com>
|
||||
日本語 <japan@jp.net>
|
||||
"ひほん カケ苦" <foo@bar.com>
|
||||
single@entry.com
|
||||
arsch@popsch.com
|
||||
test open <open@open.com>
|
||||
"""
|
||||
|
||||
print(f"REGEX: SUB_EMAIL_BASIC_REGEX: {SUB_EMAIL_BASIC_REGEX}")
|
||||
print(f"REGEX: EMAIL_BASIC_REGEX: {EMAIL_BASIC_REGEX}")
|
||||
print(f"REGEX: COMPILED_NAME_EMAIL_SIMPLE_REGEX: {COMPILED_NAME_EMAIL_SIMPLE_REGEX}")
|
||||
print(f"REGEX: NAME_EMAIL_BASIC_REGEX: {NAME_EMAIL_BASIC_REGEX}")
|
||||
|
||||
basic_email = COMPILED_EMAIL_BASIC_REGEX
|
||||
sub_basic_email = compile_re(SUB_EMAIL_BASIC_REGEX)
|
||||
simple_name_email_regex = COMPILED_NAME_EMAIL_SIMPLE_REGEX
|
||||
full_name_email_regex = COMPILED_NAME_EMAIL_BASIC_REGEX
|
||||
for email in email_list.splitlines():
|
||||
email = email.strip()
|
||||
if not email:
|
||||
continue
|
||||
print(f">>> Testing: {email}")
|
||||
if not basic_email.match(email):
|
||||
print(f"{Colors.red}[EMAIL ] No match: {email}{Colors.reset}")
|
||||
else:
|
||||
print(f"{Colors.green}[EMAIL ] Matched : {email}{Colors.reset}")
|
||||
if not sub_basic_email.match(email):
|
||||
print(f"{Colors.red}[SUB ] No match: {email}{Colors.reset}")
|
||||
else:
|
||||
print(f"{Colors.green}[SUB ] Matched : {email}{Colors.reset}")
|
||||
if not simple_name_email_regex.match(email):
|
||||
print(f"{Colors.red}[SIMPLE] No match: {email}{Colors.reset}")
|
||||
else:
|
||||
print(f"{Colors.green}[SIMPLE] Matched : {email}{Colors.reset}")
|
||||
if not full_name_email_regex.match(email):
|
||||
print(f"{Colors.red}[FULL ] No match: {email}{Colors.reset}")
|
||||
else:
|
||||
print(f"{Colors.green}[FULL ] Matched : {email}{Colors.reset}")
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
Test regex checks
|
||||
"""
|
||||
domain_test()
|
||||
email_test()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
# __END__
|
||||
@@ -12,10 +12,12 @@ some_match_list=foo,bar
|
||||
test_list=a,b,c,d f, g h
|
||||
other_list=a|b|c|d|
|
||||
third_list=xy|ab|df|fg
|
||||
empty_list=
|
||||
str_length=foobar
|
||||
int_range=20
|
||||
int_range_not_set=
|
||||
int_range_not_set_empty_set=5
|
||||
bool_var=True
|
||||
#
|
||||
match_target=foo
|
||||
match_target_list=foo,bar,baz
|
||||
@@ -37,3 +39,6 @@ email_bad=gii@bar.com
|
||||
[LoadTest]
|
||||
a.b.c=foo
|
||||
d:e:f=bar
|
||||
|
||||
[ErrorTest]
|
||||
some_value=42
|
||||
|
||||
@@ -21,11 +21,6 @@ def main():
|
||||
Main run
|
||||
"""
|
||||
|
||||
value = "2025/1/1"
|
||||
regex_c = re.compile(SettingsLoaderCheck.CHECK_SETTINGS['string.date']['regex'], re.VERBOSE)
|
||||
result = regex_c.search(value)
|
||||
print(f"regex {regex_c} check against {value} -> {result}")
|
||||
|
||||
# for log testing
|
||||
log = Log(
|
||||
log_path=ROOT_PATH.joinpath(LOG_DIR, 'settings_loader.log'),
|
||||
@@ -37,6 +32,11 @@ def main():
|
||||
)
|
||||
log.logger.info('Settings loader')
|
||||
|
||||
value = "2025/1/1"
|
||||
regex_c = re.compile(SettingsLoaderCheck.CHECK_SETTINGS['string.date']['regex'], re.VERBOSE)
|
||||
result = regex_c.search(value)
|
||||
log.info(f"regex {regex_c} check against {value} -> {result}")
|
||||
|
||||
sl = SettingsLoader(
|
||||
{
|
||||
'overload_from_args': 'OVERLOAD from ARGS',
|
||||
@@ -69,6 +69,9 @@ def main():
|
||||
"split:|",
|
||||
"check:string.alphanumeric"
|
||||
],
|
||||
"empty_list": [
|
||||
"split:,",
|
||||
],
|
||||
"str_length": [
|
||||
"length:2-10"
|
||||
],
|
||||
@@ -81,6 +84,7 @@ def main():
|
||||
"int_range_not_set_empty_set": [
|
||||
"empty:"
|
||||
],
|
||||
"bool_var": ["convert:bool"],
|
||||
"match_target": ["matching:foo"],
|
||||
"match_target_list": ["split:,", "matching:foo|bar|baz",],
|
||||
"match_source_a": ["in:match_target"],
|
||||
@@ -125,6 +129,20 @@ def main():
|
||||
except ValueError as e:
|
||||
print(f"Could not load settings: {e}")
|
||||
|
||||
try:
|
||||
config_load = 'ErrorTest'
|
||||
config_data = sl.load_settings(
|
||||
config_load,
|
||||
{
|
||||
"some_value": [
|
||||
"check:string.email.basic",
|
||||
],
|
||||
}
|
||||
)
|
||||
print(f"[{config_load}] Load: {config_load} -> {dump_data(config_data)}")
|
||||
except ValueError as e:
|
||||
print(f"Could not load settings: {e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
139
test-run/db_handling/sql_main.py
Normal file
139
test-run/db_handling/sql_main.py
Normal file
@@ -0,0 +1,139 @@
|
||||
"""
|
||||
SQL Main wrapper test
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
from uuid import uuid4
|
||||
import json
|
||||
from corelibs.debug_handling.dump_data import dump_data
|
||||
from corelibs.logging_handling.log import Log, Logger
|
||||
from corelibs.db_handling.sql_main import SQLMain
|
||||
|
||||
SCRIPT_PATH: Path = Path(__file__).resolve().parent
|
||||
ROOT_PATH: Path = SCRIPT_PATH
|
||||
DATABASE_DIR: Path = Path("database")
|
||||
LOG_DIR: Path = Path("log")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""
|
||||
Comment
|
||||
"""
|
||||
log = Log(
|
||||
log_path=ROOT_PATH.joinpath(LOG_DIR, 'sqlite_main.log'),
|
||||
log_name="SQLite Main",
|
||||
log_settings={
|
||||
"log_level_console": 'DEBUG',
|
||||
"log_level_file": 'DEBUG',
|
||||
}
|
||||
)
|
||||
sql_main = SQLMain(
|
||||
log=Logger(log.get_logger_settings()),
|
||||
db_ident=f"sqlite:{ROOT_PATH.joinpath(DATABASE_DIR, 'test_sqlite_main.db')}"
|
||||
)
|
||||
if sql_main.connected():
|
||||
log.info("SQL Main connected successfully")
|
||||
else:
|
||||
log.error('SQL Main connection failed')
|
||||
if sql_main.dbh is None:
|
||||
log.error('SQL Main DBH instance is None')
|
||||
return
|
||||
|
||||
if sql_main.dbh.trigger_exists('trg_test_a_set_date_updated_on_update'):
|
||||
log.info("Trigger trg_test_a_set_date_updated_on_update exists")
|
||||
if sql_main.dbh.table_exists('test_a'):
|
||||
log.info("Table test_a exists, dropping for clean test")
|
||||
sql_main.dbh.execute_query("DROP TABLE test_a;")
|
||||
# create a dummy table
|
||||
table_sql = """
|
||||
CREATE TABLE IF NOT EXISTS test_a (
|
||||
test_a_id INTEGER PRIMARY KEY,
|
||||
date_created TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')),
|
||||
date_updated TEXT,
|
||||
uid TEXT NOT NULL UNIQUE,
|
||||
set_current_timestamp TEXT DEFAULT CURRENT_TIMESTAMP,
|
||||
text_a TEXT,
|
||||
content,
|
||||
int_a INTEGER,
|
||||
float_a REAL
|
||||
);
|
||||
"""
|
||||
|
||||
result = sql_main.dbh.execute_query(table_sql)
|
||||
log.debug(f"Create table result: {result}")
|
||||
trigger_sql = """
|
||||
CREATE TRIGGER trg_test_a_set_date_updated_on_update
|
||||
AFTER UPDATE ON test_a
|
||||
FOR EACH ROW
|
||||
WHEN OLD.date_updated IS NULL OR NEW.date_updated = OLD.date_updated
|
||||
BEGIN
|
||||
UPDATE test_a
|
||||
SET date_updated = (strftime('%Y-%m-%d %H:%M:%f', 'now'))
|
||||
WHERE test_a_id = NEW.test_a_id;
|
||||
END;
|
||||
"""
|
||||
result = sql_main.dbh.execute_query(trigger_sql)
|
||||
log.debug(f"Create trigger result: {result}")
|
||||
result = sql_main.dbh.meta_data_detail('test_a')
|
||||
log.debug(f"Table meta data detail: {dump_data(result)}")
|
||||
# INSERT DATA
|
||||
sql = """
|
||||
INSERT INTO test_a (uid, text_a, content, int_a, float_a)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
RETURNING test_a_id, uid;
|
||||
"""
|
||||
result = sql_main.dbh.execute_query(
|
||||
sql,
|
||||
(
|
||||
str(uuid4()),
|
||||
'Some text A',
|
||||
json.dumps({'foo': 'bar', 'number': 42}),
|
||||
123,
|
||||
123.456,
|
||||
)
|
||||
)
|
||||
log.debug(f"[1] Insert data result: {dump_data(result)}")
|
||||
__uid: str = ''
|
||||
if result is not False:
|
||||
# first one only of interest
|
||||
result = dict(result[0])
|
||||
__uid = str(result.get('uid', ''))
|
||||
# second insert
|
||||
result = sql_main.dbh.execute_query(
|
||||
sql,
|
||||
(
|
||||
str(uuid4()),
|
||||
'Some text A',
|
||||
json.dumps({'foo': 'bar', 'number': 42}),
|
||||
123,
|
||||
123.456,
|
||||
)
|
||||
)
|
||||
log.debug(f"[2] Insert data result: {dump_data(result)}")
|
||||
result = sql_main.dbh.execute_query("SELECT * FROM test_a;")
|
||||
log.debug(f"Select data result: {dump_data(result)}")
|
||||
result = sql_main.dbh.return_one("SELECT * FROM test_a WHERE uid = ?;", (__uid,))
|
||||
log.debug(f"Fetch row result: {dump_data(result)}")
|
||||
sql = """
|
||||
UPDATE test_a
|
||||
SET text_a = ?
|
||||
WHERE uid = ?;
|
||||
"""
|
||||
result = sql_main.dbh.execute_query(
|
||||
sql,
|
||||
(
|
||||
'Some updated text A',
|
||||
__uid,
|
||||
)
|
||||
)
|
||||
log.debug(f"Update data result: {dump_data(result)}")
|
||||
result = sql_main.dbh.return_one("SELECT * FROM test_a WHERE uid = ?;", (__uid,))
|
||||
log.debug(f"Fetch row after update result: {dump_data(result)}")
|
||||
|
||||
sql_main.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
# __END__
|
||||
@@ -1,7 +1,5 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
Main comment
|
||||
SQLite IO test
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
@@ -24,12 +24,19 @@ def main() -> None:
|
||||
"lookup_value_c": "B02",
|
||||
"replace_value": "R02",
|
||||
},
|
||||
{
|
||||
"lookup_value_p": "A03",
|
||||
"lookup_value_c": "B03",
|
||||
"replace_value": "R03",
|
||||
},
|
||||
]
|
||||
test_foo = ArraySearchList(
|
||||
key = "lookup_value_p",
|
||||
value = "A01"
|
||||
key="lookup_value_p",
|
||||
value="A01"
|
||||
)
|
||||
print(test_foo)
|
||||
result = find_in_array_from_list(data, [test_foo])
|
||||
print(f"Search A: {dump_data(test_foo)} -> {dump_data(result)}")
|
||||
|
||||
search: list[ArraySearchList] = [
|
||||
{
|
||||
"key": "lookup_value_p",
|
||||
@@ -38,12 +45,122 @@ def main() -> None:
|
||||
{
|
||||
"key": "lookup_value_c",
|
||||
"value": "B01"
|
||||
},
|
||||
]
|
||||
result = find_in_array_from_list(data, search)
|
||||
print(f"Search B: {dump_data(search)} -> {dump_data(result)}")
|
||||
|
||||
search: list[ArraySearchList] = [
|
||||
{
|
||||
"key": "lookup_value_p",
|
||||
"value": "A01"
|
||||
},
|
||||
{
|
||||
"key": "lookup_value_c",
|
||||
"value": "B01"
|
||||
},
|
||||
{
|
||||
"key": "lookup_value_c",
|
||||
"value": "B02"
|
||||
},
|
||||
]
|
||||
try:
|
||||
result = find_in_array_from_list(data, search)
|
||||
print(f"Search C: {dump_data(search)} -> {dump_data(result)}")
|
||||
except KeyError as e:
|
||||
print(f"Search C raised KeyError: {e}")
|
||||
|
||||
search: list[ArraySearchList] = [
|
||||
{
|
||||
"key": "lookup_value_p",
|
||||
"value": "A01"
|
||||
},
|
||||
{
|
||||
"key": "lookup_value_c",
|
||||
"value": ["B01", "B02"]
|
||||
},
|
||||
]
|
||||
try:
|
||||
result = find_in_array_from_list(data, search)
|
||||
print(f"Search D: {dump_data(search)} -> {dump_data(result)}")
|
||||
except KeyError as e:
|
||||
print(f"Search D raised KeyError: {e}")
|
||||
|
||||
search: list[ArraySearchList] = [
|
||||
{
|
||||
"key": "lookup_value_p",
|
||||
"value": ["A01", "A03"]
|
||||
},
|
||||
{
|
||||
"key": "lookup_value_c",
|
||||
"value": ["B01", "B02"]
|
||||
},
|
||||
]
|
||||
try:
|
||||
result = find_in_array_from_list(data, search)
|
||||
print(f"Search E: {dump_data(search)} -> {dump_data(result)}")
|
||||
except KeyError as e:
|
||||
print(f"Search E raised KeyError: {e}")
|
||||
|
||||
search: list[ArraySearchList] = [
|
||||
{
|
||||
"key": "lookup_value_p",
|
||||
"value": "NOT FOUND"
|
||||
},
|
||||
]
|
||||
try:
|
||||
result = find_in_array_from_list(data, search)
|
||||
print(f"Search F: {dump_data(search)} -> {dump_data(result)}")
|
||||
except KeyError as e:
|
||||
print(f"Search F raised KeyError: {e}")
|
||||
|
||||
data = [
|
||||
{
|
||||
"sd_user_id": "1593",
|
||||
"email": "",
|
||||
"employee_id": ""
|
||||
},
|
||||
{
|
||||
"sd_user_id": "1592",
|
||||
"email": "",
|
||||
"employee_id": ""
|
||||
},
|
||||
{
|
||||
"sd_user_id": "1596",
|
||||
"email": "",
|
||||
"employee_id": ""
|
||||
},
|
||||
{
|
||||
"sd_user_id": "1594",
|
||||
"email": "",
|
||||
"employee_id": ""
|
||||
},
|
||||
{
|
||||
"sd_user_id": "1595",
|
||||
"email": "",
|
||||
"employee_id": ""
|
||||
},
|
||||
{
|
||||
"sd_user_id": "1861",
|
||||
"email": "",
|
||||
"employee_id": ""
|
||||
},
|
||||
{
|
||||
"sd_user_id": "1862",
|
||||
"email": "",
|
||||
"employee_id": ""
|
||||
},
|
||||
{
|
||||
"sd_user_id": "1860",
|
||||
"email": "",
|
||||
"employee_id": ""
|
||||
}
|
||||
]
|
||||
|
||||
result = find_in_array_from_list(data, search)
|
||||
|
||||
print(f"Search {dump_data(search)} -> {dump_data(result)}")
|
||||
result = find_in_array_from_list(data, [ArraySearchList(
|
||||
key="sd_user_id",
|
||||
value="1593"
|
||||
)])
|
||||
print(f"Search F: -> {dump_data(result)}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -2,7 +2,10 @@
|
||||
test list helpers
|
||||
"""
|
||||
|
||||
from corelibs.iterator_handling.list_helpers import is_list_in_list, convert_to_list
|
||||
from typing import Any
|
||||
from corelibs.debug_handling.dump_data import dump_data
|
||||
from corelibs.iterator_handling.list_helpers import is_list_in_list, convert_to_list, make_unique_list_of_dicts
|
||||
from corelibs.iterator_handling.fingerprint import dict_hash_crc
|
||||
|
||||
|
||||
def __test_is_list_in_list_a():
|
||||
@@ -18,9 +21,66 @@ def __convert_list():
|
||||
print(f"IN: {source} -> {result}")
|
||||
|
||||
|
||||
def __make_unique_list_of_dicts():
|
||||
dict_list = [
|
||||
{"a": 1, "b": 2, "nested": {"x": 10, "y": 20}},
|
||||
{"a": 1, "b": 2, "nested": {"x": 10, "y": 20}},
|
||||
{"b": 2, "a": 1, "nested": {"y": 20, "x": 10}},
|
||||
{"b": 2, "a": 1, "nested": {"y": 20, "x": 30}},
|
||||
{"a": 3, "b": 4, "nested": {"x": 30, "y": 40}}
|
||||
]
|
||||
unique_dicts = make_unique_list_of_dicts(dict_list)
|
||||
dhf = dict_hash_crc(unique_dicts)
|
||||
print(f"Unique dicts: {dump_data(unique_dicts)} [{dhf}]")
|
||||
|
||||
dict_list = [
|
||||
{"a": 1, 1: "one"},
|
||||
{1: "one", "a": 1},
|
||||
{"a": 2, 1: "one"}
|
||||
]
|
||||
unique_dicts = make_unique_list_of_dicts(dict_list)
|
||||
dhf = dict_hash_crc(unique_dicts)
|
||||
print(f"Unique dicts: {dump_data(unique_dicts)} [{dhf}]")
|
||||
|
||||
dict_list = [
|
||||
{"a": 1, "b": [1, 2, 3]},
|
||||
{"b": [1, 2, 3], "a": 1},
|
||||
{"a": 1, "b": [1, 2, 4]},
|
||||
1, 2, "String", 1, "Foobar"
|
||||
]
|
||||
unique_dicts = make_unique_list_of_dicts(dict_list)
|
||||
dhf = dict_hash_crc(unique_dicts)
|
||||
print(f"Unique dicts: {dump_data(unique_dicts)} [{dhf}]")
|
||||
|
||||
dict_list: list[Any] = [
|
||||
[],
|
||||
{},
|
||||
[],
|
||||
{},
|
||||
{"a": []},
|
||||
{"a": []},
|
||||
{"a": {}},
|
||||
{"a": {}},
|
||||
]
|
||||
unique_dicts = make_unique_list_of_dicts(dict_list)
|
||||
dhf = dict_hash_crc(unique_dicts)
|
||||
print(f"Unique dicts: {dump_data(unique_dicts)} [{dhf}]")
|
||||
|
||||
dict_list: list[Any] = [
|
||||
(1, 2),
|
||||
(1, 2),
|
||||
(2, 3),
|
||||
]
|
||||
unique_dicts = make_unique_list_of_dicts(dict_list)
|
||||
dhf = dict_hash_crc(unique_dicts)
|
||||
print(f"Unique dicts: {dump_data(unique_dicts)} [{dhf}]")
|
||||
|
||||
|
||||
def main():
|
||||
"""List helpers test runner"""
|
||||
__test_is_list_in_list_a()
|
||||
__convert_list()
|
||||
__make_unique_list_of_dicts()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -27,7 +27,8 @@ def main():
|
||||
"per_run_log": True,
|
||||
# "console_format_type": ConsoleFormatSettings.NONE,
|
||||
# "console_format_type": ConsoleFormatSettings.MINIMAL,
|
||||
"console_format_type": ConsoleFormat.TIME_MICROSECONDS | ConsoleFormat.NAME | ConsoleFormat.LEVEL,
|
||||
# "console_format_type": ConsoleFormat.TIME_MICROSECONDS | ConsoleFormat.NAME | ConsoleFormat.LEVEL,
|
||||
"console_format_type": None,
|
||||
# "console_format_type": ConsoleFormat.NAME,
|
||||
# "console_format_type": (
|
||||
# ConsoleFormat.TIME | ConsoleFormat.TIMEZONE | ConsoleFormat.LINENO | ConsoleFormat.LEVEL
|
||||
@@ -121,10 +122,16 @@ def main():
|
||||
|
||||
log.set_log_level(Log.CONSOLE_HANDLER, LoggingLevel.DEBUG)
|
||||
log.debug('Current logging format: %s', log.log_settings['console_format_type'])
|
||||
log.debug('Current console formatter: %s', log.get_console_formatter())
|
||||
log.update_console_formatter(ConsoleFormat.TIME | ConsoleFormat.LINENO)
|
||||
log.info('Does hit show less')
|
||||
log.info('Does hit show less A')
|
||||
log.debug('Current console formatter after A: %s', log.get_console_formatter())
|
||||
log.update_console_formatter(ConsoleFormat.TIME | ConsoleFormat.LINENO)
|
||||
log.info('Does hit show less B')
|
||||
log.debug('Current console formatter after B: %s', log.get_console_formatter())
|
||||
log.update_console_formatter(ConsoleFormatSettings.ALL)
|
||||
log.info('Does hit show less C')
|
||||
log.debug('Current console formatter after C: %s', log.get_console_formatter())
|
||||
print(f"*** Any handler is minimum level ERROR: {log.any_handler_is_minimum_level(LoggingLevel.ERROR)}")
|
||||
print(f"*** Any handler is minimum level DEBUG: {log.any_handler_is_minimum_level(LoggingLevel.DEBUG)}")
|
||||
|
||||
|
||||
@@ -8,10 +8,21 @@ import re
|
||||
import pytest
|
||||
from corelibs.check_handling.regex_constants import (
|
||||
compile_re,
|
||||
SUB_EMAIL_BASIC_REGEX,
|
||||
EMAIL_BASIC_REGEX,
|
||||
NAME_EMAIL_SIMPLE_REGEX,
|
||||
NAME_EMAIL_BASIC_REGEX,
|
||||
DOMAIN_WITH_LOCALHOST_REGEX,
|
||||
DOMAIN_WITH_LOCALHOST_PORT_REGEX,
|
||||
DOMAIN_REGEX,
|
||||
DOMAIN_REGEX
|
||||
)
|
||||
from corelibs.check_handling.regex_constants_compiled import (
|
||||
COMPILED_EMAIL_BASIC_REGEX,
|
||||
COMPILED_NAME_EMAIL_SIMPLE_REGEX,
|
||||
COMPILED_NAME_EMAIL_BASIC_REGEX,
|
||||
COMPILED_DOMAIN_WITH_LOCALHOST_REGEX,
|
||||
COMPILED_DOMAIN_WITH_LOCALHOST_PORT_REGEX,
|
||||
COMPILED_DOMAIN_REGEX,
|
||||
)
|
||||
|
||||
|
||||
@@ -48,7 +59,7 @@ class TestEmailBasicRegex:
|
||||
@pytest.fixture
|
||||
def email_pattern(self) -> re.Pattern[str]:
|
||||
"""Fixture that returns compiled email regex pattern."""
|
||||
return compile_re(EMAIL_BASIC_REGEX)
|
||||
return COMPILED_EMAIL_BASIC_REGEX
|
||||
|
||||
@pytest.mark.parametrize("valid_email", [
|
||||
"user@example.com",
|
||||
@@ -123,13 +134,272 @@ class TestEmailBasicRegex:
|
||||
assert not email_pattern.match(email)
|
||||
|
||||
|
||||
class TestSubEmailBasicRegex:
|
||||
"""Test cases for SUB_EMAIL_BASIC_REGEX pattern (without anchors)."""
|
||||
|
||||
@pytest.fixture
|
||||
def sub_email_pattern(self) -> re.Pattern[str]:
|
||||
"""Fixture that returns compiled sub email regex pattern."""
|
||||
return compile_re(rf"^{SUB_EMAIL_BASIC_REGEX}$")
|
||||
|
||||
@pytest.mark.parametrize("valid_email", [
|
||||
"user@example.com",
|
||||
"test.user@example.com",
|
||||
"user+tag@example.co.uk",
|
||||
"first.last@subdomain.example.com",
|
||||
"user123@test-domain.com",
|
||||
"a@example.com",
|
||||
"user_name@example.com",
|
||||
"user-name@example.com",
|
||||
"user@sub.domain.example.com",
|
||||
"test!#$%&'*+-/=?^_`{|}~@example.com",
|
||||
"1234567890@example.com",
|
||||
])
|
||||
def test_valid_emails_match(self, sub_email_pattern: re.Pattern[str], valid_email: str) -> None:
|
||||
"""Test that valid email addresses match SUB_EMAIL_BASIC_REGEX."""
|
||||
assert sub_email_pattern.match(valid_email), (
|
||||
f"Failed to match valid email: {valid_email}"
|
||||
)
|
||||
|
||||
@pytest.mark.parametrize("invalid_email", [
|
||||
"",
|
||||
"@example.com",
|
||||
"user@",
|
||||
"user",
|
||||
"user@.com",
|
||||
"user@domain",
|
||||
"user @example.com",
|
||||
".user@example.com",
|
||||
"user@-example.com",
|
||||
"user@example-.com",
|
||||
"user@example.c",
|
||||
"user@example.toolong",
|
||||
])
|
||||
def test_invalid_emails_no_match(self, sub_email_pattern: re.Pattern[str], invalid_email: str) -> None:
|
||||
"""Test that invalid emails don't match SUB_EMAIL_BASIC_REGEX."""
|
||||
assert not sub_email_pattern.match(invalid_email), (
|
||||
f"Incorrectly matched invalid email: {invalid_email}"
|
||||
)
|
||||
|
||||
def test_sub_email_max_local_part_length(self, sub_email_pattern: re.Pattern[str]) -> None:
|
||||
"""Test email with maximum local part length (64 characters)."""
|
||||
local_part = "a" * 64
|
||||
email = f"{local_part}@example.com"
|
||||
assert sub_email_pattern.match(email)
|
||||
|
||||
def test_sub_email_exceeds_local_part_length(self, sub_email_pattern: re.Pattern[str]) -> None:
|
||||
"""Test email exceeding maximum local part length."""
|
||||
local_part = "a" * 65
|
||||
email = f"{local_part}@example.com"
|
||||
assert not sub_email_pattern.match(email)
|
||||
|
||||
|
||||
class TestNameEmailSimpleRegex:
|
||||
"""Test cases for NAME_EMAIL_SIMPLE_REGEX pattern."""
|
||||
|
||||
@pytest.fixture
|
||||
def name_email_simple_pattern(self) -> re.Pattern[str]:
|
||||
"""Fixture that returns compiled name+email simple regex pattern."""
|
||||
return COMPILED_NAME_EMAIL_SIMPLE_REGEX
|
||||
|
||||
@pytest.mark.parametrize("test_input,expected_groups", [
|
||||
('"John Doe" <john@example.com>', {'name1': 'John Doe', 'email1': 'john@example.com'}),
|
||||
('John Doe <john@example.com>', {'name2': 'John Doe', 'email2': 'john@example.com'}),
|
||||
('<john@example.com>', {'email3': 'john@example.com'}),
|
||||
('john@example.com', {'email4': 'john@example.com'}),
|
||||
(' "Jane Smith" <jane@test.com> ', {'name1': 'Jane Smith', 'email1': 'jane@test.com'}),
|
||||
('Bob <bob@test.org>', {'name2': 'Bob', 'email2': 'bob@test.org'}),
|
||||
])
|
||||
def test_valid_name_email_combinations(
|
||||
self, name_email_simple_pattern: re.Pattern[str], test_input: str, expected_groups: dict[str, str]
|
||||
) -> None:
|
||||
"""Test that valid name+email combinations match and extract correct groups."""
|
||||
match = name_email_simple_pattern.match(test_input)
|
||||
assert match is not None, f"Failed to match: {test_input}"
|
||||
|
||||
# Check that expected groups are present and match
|
||||
for group_name, expected_value in expected_groups.items():
|
||||
assert match.group(group_name) == expected_value, (
|
||||
f"Group {group_name} expected '{expected_value}', got '{match.group(group_name)}'"
|
||||
)
|
||||
|
||||
@pytest.mark.parametrize("invalid_input", [
|
||||
"",
|
||||
"not an email",
|
||||
"<>",
|
||||
'"Name Only"',
|
||||
'Name <',
|
||||
'<email',
|
||||
'Name <<email@test.com>>',
|
||||
'Name <email@test.com',
|
||||
'Name email@test.com>',
|
||||
])
|
||||
def test_invalid_name_email_combinations(
|
||||
self, name_email_simple_pattern: re.Pattern[str], invalid_input: str
|
||||
) -> None:
|
||||
"""Test that invalid inputs don't match NAME_EMAIL_SIMPLE_REGEX."""
|
||||
assert not name_email_simple_pattern.match(invalid_input), (
|
||||
f"Incorrectly matched invalid input: {invalid_input}"
|
||||
)
|
||||
|
||||
def test_extract_name_from_quoted(
|
||||
self, name_email_simple_pattern: re.Pattern[str]
|
||||
) -> None:
|
||||
"""Test extracting name from quoted format."""
|
||||
match = name_email_simple_pattern.match('"Alice Wonder" <alice@example.com>')
|
||||
assert match is not None
|
||||
assert match.group('name1') == 'Alice Wonder'
|
||||
assert match.group('email1') == 'alice@example.com'
|
||||
|
||||
def test_extract_name_from_unquoted(
|
||||
self, name_email_simple_pattern: re.Pattern[str]
|
||||
) -> None:
|
||||
"""Test extracting name from unquoted format."""
|
||||
match = name_email_simple_pattern.match('Bob Builder <bob@example.com>')
|
||||
assert match is not None
|
||||
assert match.group('name2') == 'Bob Builder'
|
||||
assert match.group('email2') == 'bob@example.com'
|
||||
|
||||
def test_email_only_in_brackets(
|
||||
self, name_email_simple_pattern: re.Pattern[str]
|
||||
) -> None:
|
||||
"""Test email-only format in angle brackets."""
|
||||
match = name_email_simple_pattern.match('<charlie@example.com>')
|
||||
assert match is not None
|
||||
assert match.group('email3') == 'charlie@example.com'
|
||||
|
||||
def test_email_only_plain(
|
||||
self, name_email_simple_pattern: re.Pattern[str]
|
||||
) -> None:
|
||||
"""Test plain email format without brackets."""
|
||||
match = name_email_simple_pattern.match('dave@example.com')
|
||||
assert match is not None
|
||||
assert match.group('email4') == 'dave@example.com'
|
||||
|
||||
def test_whitespace_handling(
|
||||
self, name_email_simple_pattern: re.Pattern[str]
|
||||
) -> None:
|
||||
"""Test that leading/trailing whitespace is handled correctly."""
|
||||
match = name_email_simple_pattern.match(' "User Name" <user@example.com> ')
|
||||
assert match is not None
|
||||
assert match.group('name1') == 'User Name'
|
||||
assert match.group('email1') == 'user@example.com'
|
||||
|
||||
|
||||
class TestNameEmailBasicRegex:
|
||||
"""Test cases for NAME_EMAIL_BASIC_REGEX pattern with strict email validation."""
|
||||
|
||||
@pytest.fixture
|
||||
def name_email_basic_pattern(self) -> re.Pattern[str]:
|
||||
"""Fixture that returns compiled name+email basic regex pattern."""
|
||||
return COMPILED_NAME_EMAIL_BASIC_REGEX
|
||||
|
||||
@pytest.mark.parametrize("test_input,expected_name,expected_email", [
|
||||
('"John Doe" <john@example.com>', 'John Doe', 'john@example.com'),
|
||||
('John Doe <john@example.com>', 'John Doe', 'john@example.com'),
|
||||
('<john@example.com>', None, 'john@example.com'),
|
||||
('john@example.com', None, 'john@example.com'),
|
||||
(' "Jane Smith" <jane.smith@test.co.uk> ', 'Jane Smith', 'jane.smith@test.co.uk'),
|
||||
('Alice Wonder <alice+tag@example.com>', 'Alice Wonder', 'alice+tag@example.com'),
|
||||
])
|
||||
def test_valid_name_email_with_validation(
|
||||
self,
|
||||
name_email_basic_pattern: re.Pattern[str],
|
||||
test_input: str,
|
||||
expected_name: str | None,
|
||||
expected_email: str,
|
||||
) -> None:
|
||||
"""Test valid name+email with strict email validation."""
|
||||
match = name_email_basic_pattern.match(test_input)
|
||||
assert match is not None, f"Failed to match: {test_input}"
|
||||
|
||||
# Extract name and email from whichever group matched
|
||||
name = match.group('name1') or match.group('name2')
|
||||
email = (
|
||||
match.group('email1') or match.group('email2') or
|
||||
match.group('email3') or match.group('email4')
|
||||
)
|
||||
|
||||
assert name == expected_name, f"Expected name '{expected_name}', got '{name}'"
|
||||
assert email == expected_email, f"Expected email '{expected_email}', got '{email}'"
|
||||
|
||||
@pytest.mark.parametrize("invalid_input", [
|
||||
'"John Doe" <invalid.email>', # invalid email format
|
||||
'John Doe <@example.com>', # missing local part
|
||||
'<user@>', # missing domain
|
||||
'user@domain', # no TLD
|
||||
'"Name" <user @example.com>', # space in email
|
||||
'<.user@example.com>', # starts with dot
|
||||
'user@-example.com', # domain starts with hyphen
|
||||
'Name <user@example.c>', # TLD too short
|
||||
'Name <user@example.toolongdomain>', # TLD too long
|
||||
])
|
||||
def test_invalid_email_format_rejected(
|
||||
self, name_email_basic_pattern: re.Pattern[str], invalid_input: str
|
||||
) -> None:
|
||||
"""Test that inputs with invalid email formats are rejected."""
|
||||
assert not name_email_basic_pattern.match(invalid_input), (
|
||||
f"Incorrectly matched invalid input: {invalid_input}"
|
||||
)
|
||||
|
||||
def test_quoted_name_with_valid_email(
|
||||
self, name_email_basic_pattern: re.Pattern[str]
|
||||
) -> None:
|
||||
"""Test quoted name format with valid email."""
|
||||
match = name_email_basic_pattern.match('"Alice Wonder" <alice@example.com>')
|
||||
assert match is not None
|
||||
assert match.group('name1') == 'Alice Wonder'
|
||||
assert match.group('email1') == 'alice@example.com'
|
||||
|
||||
def test_unquoted_name_with_valid_email(
|
||||
self, name_email_basic_pattern: re.Pattern[str]
|
||||
) -> None:
|
||||
"""Test unquoted name format with valid email."""
|
||||
match = name_email_basic_pattern.match('Bob Builder <bob@example.com>')
|
||||
assert match is not None
|
||||
assert match.group('name2') == 'Bob Builder'
|
||||
assert match.group('email2') == 'bob@example.com'
|
||||
|
||||
def test_email_only_formats(
|
||||
self, name_email_basic_pattern: re.Pattern[str]
|
||||
) -> None:
|
||||
"""Test email-only formats (with and without brackets)."""
|
||||
# With brackets
|
||||
match1 = name_email_basic_pattern.match('<charlie@example.com>')
|
||||
assert match1 is not None
|
||||
assert match1.group('email3') == 'charlie@example.com'
|
||||
|
||||
# Without brackets
|
||||
match2 = name_email_basic_pattern.match('dave@example.com')
|
||||
assert match2 is not None
|
||||
assert match2.group('email4') == 'dave@example.com'
|
||||
|
||||
def test_whitespace_handling(
|
||||
self, name_email_basic_pattern: re.Pattern[str]
|
||||
) -> None:
|
||||
"""Test that leading/trailing whitespace is handled correctly."""
|
||||
match = name_email_basic_pattern.match(' "User" <user@example.com> ')
|
||||
assert match is not None
|
||||
assert match.group('name1') == 'User'
|
||||
assert match.group('email1') == 'user@example.com'
|
||||
|
||||
def test_special_characters_in_local_part(
|
||||
self, name_email_basic_pattern: re.Pattern[str]
|
||||
) -> None:
|
||||
"""Test email with special characters in local part."""
|
||||
match = name_email_basic_pattern.match('Test User <test!#$%&\'*+-/=?^_`{|}~@example.com>')
|
||||
assert match is not None
|
||||
assert match.group('name2') == 'Test User'
|
||||
assert match.group('email2') == 'test!#$%&\'*+-/=?^_`{|}~@example.com'
|
||||
|
||||
|
||||
class TestDomainWithLocalhostRegex:
|
||||
"""Test cases for DOMAIN_WITH_LOCALHOST_REGEX pattern."""
|
||||
|
||||
@pytest.fixture
|
||||
def domain_localhost_pattern(self) -> re.Pattern[str]:
|
||||
"""Fixture that returns compiled domain with localhost regex pattern."""
|
||||
return compile_re(DOMAIN_WITH_LOCALHOST_REGEX)
|
||||
return COMPILED_DOMAIN_WITH_LOCALHOST_REGEX
|
||||
|
||||
@pytest.mark.parametrize("valid_domain", [
|
||||
"localhost",
|
||||
@@ -181,7 +451,7 @@ class TestDomainWithLocalhostPortRegex:
|
||||
@pytest.fixture
|
||||
def domain_localhost_port_pattern(self) -> re.Pattern[str]:
|
||||
"""Fixture that returns compiled domain and localhost with port pattern."""
|
||||
return compile_re(DOMAIN_WITH_LOCALHOST_PORT_REGEX)
|
||||
return COMPILED_DOMAIN_WITH_LOCALHOST_PORT_REGEX
|
||||
|
||||
@pytest.mark.parametrize("valid_domain", [
|
||||
"localhost",
|
||||
@@ -247,7 +517,7 @@ class TestDomainRegex:
|
||||
@pytest.fixture
|
||||
def domain_pattern(self) -> re.Pattern[str]:
|
||||
"""Fixture that returns compiled domain regex pattern."""
|
||||
return compile_re(DOMAIN_REGEX)
|
||||
return COMPILED_DOMAIN_REGEX
|
||||
|
||||
@pytest.mark.parametrize("valid_domain", [
|
||||
"example.com",
|
||||
@@ -306,6 +576,8 @@ class TestRegexPatternConsistency:
|
||||
"""Test that all regex patterns can be compiled without errors."""
|
||||
patterns = [
|
||||
EMAIL_BASIC_REGEX,
|
||||
NAME_EMAIL_SIMPLE_REGEX,
|
||||
NAME_EMAIL_BASIC_REGEX,
|
||||
DOMAIN_WITH_LOCALHOST_REGEX,
|
||||
DOMAIN_WITH_LOCALHOST_PORT_REGEX,
|
||||
DOMAIN_REGEX,
|
||||
@@ -314,9 +586,24 @@ class TestRegexPatternConsistency:
|
||||
compiled = compile_re(pattern)
|
||||
assert isinstance(compiled, re.Pattern)
|
||||
|
||||
def test_compiled_patterns_are_patterns(self) -> None:
|
||||
"""Test that all COMPILED_ constants are Pattern objects."""
|
||||
compiled_patterns = [
|
||||
COMPILED_EMAIL_BASIC_REGEX,
|
||||
COMPILED_NAME_EMAIL_SIMPLE_REGEX,
|
||||
COMPILED_NAME_EMAIL_BASIC_REGEX,
|
||||
COMPILED_DOMAIN_WITH_LOCALHOST_REGEX,
|
||||
COMPILED_DOMAIN_WITH_LOCALHOST_PORT_REGEX,
|
||||
COMPILED_DOMAIN_REGEX,
|
||||
]
|
||||
for pattern in compiled_patterns:
|
||||
assert isinstance(pattern, re.Pattern)
|
||||
|
||||
def test_domain_patterns_are_strings(self) -> None:
|
||||
"""Test that all regex constants are strings."""
|
||||
assert isinstance(EMAIL_BASIC_REGEX, str)
|
||||
assert isinstance(NAME_EMAIL_SIMPLE_REGEX, str)
|
||||
assert isinstance(NAME_EMAIL_BASIC_REGEX, str)
|
||||
assert isinstance(DOMAIN_WITH_LOCALHOST_REGEX, str)
|
||||
assert isinstance(DOMAIN_WITH_LOCALHOST_PORT_REGEX, str)
|
||||
assert isinstance(DOMAIN_REGEX, str)
|
||||
@@ -325,8 +612,8 @@ class TestRegexPatternConsistency:
|
||||
"""Test that domain patterns follow expected hierarchy."""
|
||||
# DOMAIN_WITH_LOCALHOST_PORT_REGEX should accept everything
|
||||
# DOMAIN_WITH_LOCALHOST_REGEX accepts
|
||||
domain_localhost = compile_re(DOMAIN_WITH_LOCALHOST_REGEX)
|
||||
domain_localhost_port = compile_re(DOMAIN_WITH_LOCALHOST_PORT_REGEX)
|
||||
domain_localhost = COMPILED_DOMAIN_WITH_LOCALHOST_REGEX
|
||||
domain_localhost_port = COMPILED_DOMAIN_WITH_LOCALHOST_PORT_REGEX
|
||||
|
||||
test_cases = ["example.com", "subdomain.example.com", "localhost"]
|
||||
for test_case in test_cases:
|
||||
|
||||
@@ -16,7 +16,7 @@ class TestSettingsLoaderInit:
|
||||
|
||||
def test_init_with_valid_config_file(self, tmp_path: Path):
|
||||
"""Test initialization with a valid config file"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[Section]\nkey=value\n")
|
||||
|
||||
loader = SettingsLoader(
|
||||
@@ -35,7 +35,7 @@ class TestSettingsLoaderInit:
|
||||
|
||||
def test_init_with_missing_config_file(self, tmp_path: Path):
|
||||
"""Test initialization with missing config file"""
|
||||
config_file = tmp_path / "missing.ini"
|
||||
config_file = tmp_path.joinpath("missing.ini")
|
||||
|
||||
loader = SettingsLoader(
|
||||
args={},
|
||||
@@ -60,7 +60,7 @@ class TestSettingsLoaderInit:
|
||||
|
||||
def test_init_with_log(self, tmp_path: Path):
|
||||
"""Test initialization with Log object"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[Section]\nkey=value\n")
|
||||
mock_log = Mock(spec=Log)
|
||||
|
||||
@@ -80,7 +80,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_basic(self, tmp_path: Path):
|
||||
"""Test loading basic settings without validation"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nkey1=value1\nkey2=value2\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -90,7 +90,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_with_missing_section(self, tmp_path: Path):
|
||||
"""Test loading settings with missing section"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[OtherSection]\nkey=value\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -100,7 +100,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_allow_not_exist(self, tmp_path: Path):
|
||||
"""Test loading settings with allow_not_exist flag"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[OtherSection]\nkey=value\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -110,7 +110,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_mandatory_field_present(self, tmp_path: Path):
|
||||
"""Test mandatory field validation when field is present"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nrequired_field=value\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -123,7 +123,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_mandatory_field_missing(self, tmp_path: Path):
|
||||
"""Test mandatory field validation when field is missing"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nother_field=value\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -136,7 +136,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_mandatory_field_empty(self, tmp_path: Path):
|
||||
"""Test mandatory field validation when field is empty"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nrequired_field=\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -149,7 +149,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_with_split(self, tmp_path: Path):
|
||||
"""Test splitting values into lists"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nlist_field=a,b,c,d\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -162,7 +162,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_with_custom_split_char(self, tmp_path: Path):
|
||||
"""Test splitting with custom delimiter"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nlist_field=a|b|c|d\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -175,7 +175,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_split_removes_spaces(self, tmp_path: Path):
|
||||
"""Test that split removes spaces from values"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nlist_field=a, b , c , d\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -188,7 +188,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_empty_split_char_fallback(self, tmp_path: Path, capsys: CaptureFixture[str]):
|
||||
"""Test fallback to default split char when empty"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nlist_field=a,b,c\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -201,9 +201,22 @@ class TestLoadSettings:
|
||||
captured = capsys.readouterr()
|
||||
assert "fallback to:" in captured.out
|
||||
|
||||
def test_load_settings_split_empty_value(self, tmp_path: Path):
|
||||
"""Test that split on empty value results in empty list"""
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nlist_field=\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
result = loader.load_settings(
|
||||
"TestSection",
|
||||
{"list_field": ["split:,"]}
|
||||
)
|
||||
|
||||
assert result["list_field"] == []
|
||||
|
||||
def test_load_settings_convert_to_int(self, tmp_path: Path):
|
||||
"""Test converting values to int"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nnumber=123\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -217,7 +230,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_convert_to_float(self, tmp_path: Path):
|
||||
"""Test converting values to float"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nnumber=123.45\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -231,7 +244,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_convert_to_bool_true(self, tmp_path: Path):
|
||||
"""Test converting values to boolean True"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nflag1=true\nflag2=True\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -245,7 +258,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_convert_to_bool_false(self, tmp_path: Path):
|
||||
"""Test converting values to boolean False"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nflag1=false\nflag2=False\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -259,7 +272,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_convert_invalid_type(self, tmp_path: Path):
|
||||
"""Test converting with invalid type raises error"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=test\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -272,7 +285,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_empty_set_to_none(self, tmp_path: Path):
|
||||
"""Test setting empty values to None"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nother=value\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -285,7 +298,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_empty_set_to_custom_value(self, tmp_path: Path):
|
||||
"""Test setting empty values to custom value"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nother=value\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -298,7 +311,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_matching_valid(self, tmp_path: Path):
|
||||
"""Test matching validation with valid value"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nmode=production\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -311,7 +324,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_matching_invalid(self, tmp_path: Path):
|
||||
"""Test matching validation with invalid value"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nmode=invalid\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -324,7 +337,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_in_valid(self, tmp_path: Path):
|
||||
"""Test 'in' validation with valid value"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nallowed=a,b,c\nvalue=b\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -340,7 +353,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_in_invalid(self, tmp_path: Path):
|
||||
"""Test 'in' validation with invalid value"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nallowed=a,b,c\nvalue=d\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -356,7 +369,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_in_missing_target(self, tmp_path: Path):
|
||||
"""Test 'in' validation with missing target"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=a\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -369,7 +382,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_length_exact(self, tmp_path: Path):
|
||||
"""Test length validation with exact match"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=test\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -382,7 +395,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_length_exact_invalid(self, tmp_path: Path):
|
||||
"""Test length validation with exact match failure"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=test\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -395,7 +408,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_length_range(self, tmp_path: Path):
|
||||
"""Test length validation with range"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=testing\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -408,7 +421,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_length_min_only(self, tmp_path: Path):
|
||||
"""Test length validation with minimum only"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=testing\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -421,7 +434,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_length_max_only(self, tmp_path: Path):
|
||||
"""Test length validation with maximum only"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=test\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -434,7 +447,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_range_valid(self, tmp_path: Path):
|
||||
"""Test range validation with valid value"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nnumber=25\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -447,7 +460,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_range_invalid(self, tmp_path: Path):
|
||||
"""Test range validation with invalid value"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nnumber=100\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -460,7 +473,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_check_int_valid(self, tmp_path: Path):
|
||||
"""Test check:int with valid integer"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nnumber=12345\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -473,7 +486,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_check_int_cleanup(self, tmp_path: Path):
|
||||
"""Test check:int with cleanup"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nnumber=12a34b5\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -486,7 +499,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_check_email_valid(self, tmp_path: Path):
|
||||
"""Test check:string.email.basic with valid email"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nemail=test@example.com\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -499,7 +512,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_check_email_invalid(self, tmp_path: Path):
|
||||
"""Test check:string.email.basic with invalid email"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nemail=not-an-email\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -512,7 +525,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_args_override(self, tmp_path: Path, capsys: CaptureFixture[str]):
|
||||
"""Test command line arguments override config values"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=config_value\n")
|
||||
|
||||
loader = SettingsLoader(
|
||||
@@ -530,7 +543,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_args_no_flag(self, tmp_path: Path, capsys: CaptureFixture[str]):
|
||||
"""Test default behavior (no args_override:yes) with list argument that has split"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=a,b,c\n")
|
||||
|
||||
loader = SettingsLoader(
|
||||
@@ -550,7 +563,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_args_list_no_split(self, tmp_path: Path, capsys: CaptureFixture[str]):
|
||||
"""Test that list arguments without split entry are skipped"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=config_value\n")
|
||||
|
||||
loader = SettingsLoader(
|
||||
@@ -570,7 +583,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_args_list_with_split(self, tmp_path: Path, capsys: CaptureFixture[str]):
|
||||
"""Test that list arguments with split entry and args_override:yes are applied"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=a,b,c\n")
|
||||
|
||||
loader = SettingsLoader(
|
||||
@@ -589,7 +602,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_args_no_with_mandatory(self, tmp_path: Path, capsys: CaptureFixture[str]):
|
||||
"""Test default behavior (no args_override:yes) with mandatory field and list args with split"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=config1,config2\n")
|
||||
|
||||
loader = SettingsLoader(
|
||||
@@ -609,7 +622,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_args_no_with_mandatory_valid(self, tmp_path: Path, capsys: CaptureFixture[str]):
|
||||
"""Test default behavior with string args (always overrides due to current logic)"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=config_value\n")
|
||||
|
||||
loader = SettingsLoader(
|
||||
@@ -628,7 +641,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_args_string_no_split(self, tmp_path: Path, capsys: CaptureFixture[str]):
|
||||
"""Test that string arguments with args_override:yes work normally"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=config_value\n")
|
||||
|
||||
loader = SettingsLoader(
|
||||
@@ -647,7 +660,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_no_config_file_with_args(self, tmp_path: Path):
|
||||
"""Test loading settings without config file but with mandatory args"""
|
||||
config_file = tmp_path / "missing.ini"
|
||||
config_file = tmp_path.joinpath("missing.ini")
|
||||
|
||||
loader = SettingsLoader(
|
||||
args={"required": "value"},
|
||||
@@ -662,7 +675,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_no_config_file_missing_args(self, tmp_path: Path):
|
||||
"""Test loading settings without config file and missing args"""
|
||||
config_file = tmp_path / "missing.ini"
|
||||
config_file = tmp_path.joinpath("missing.ini")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
|
||||
@@ -674,7 +687,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_check_list_with_split(self, tmp_path: Path):
|
||||
"""Test check validation with list values"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nlist=abc,def,ghi\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -687,7 +700,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_check_list_cleanup(self, tmp_path: Path):
|
||||
"""Test check validation cleans up list values"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nlist=ab-c,de_f,gh!i\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -700,7 +713,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_invalid_check_type(self, tmp_path: Path):
|
||||
"""Test with invalid check type"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=test\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -717,7 +730,7 @@ class TestComplexScenarios:
|
||||
|
||||
def test_complex_validation_scenario(self, tmp_path: Path):
|
||||
"""Test complex scenario with multiple validations"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text(
|
||||
"[Production]\n"
|
||||
"environment=production\n"
|
||||
@@ -758,7 +771,7 @@ class TestComplexScenarios:
|
||||
|
||||
def test_email_list_validation(self, tmp_path: Path):
|
||||
"""Test email list with validation"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text(
|
||||
"[EmailConfig]\n"
|
||||
"emails=test@example.com,admin@domain.org,user+tag@site.co.uk\n"
|
||||
@@ -775,7 +788,7 @@ class TestComplexScenarios:
|
||||
|
||||
def test_mixed_args_and_config(self, tmp_path: Path):
|
||||
"""Test mixing command line args and config file"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text(
|
||||
"[Settings]\n"
|
||||
"value1=config_value1\n"
|
||||
@@ -796,7 +809,7 @@ class TestComplexScenarios:
|
||||
|
||||
def test_multiple_check_types(self, tmp_path: Path):
|
||||
"""Test multiple different check types"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text(
|
||||
"[Checks]\n"
|
||||
"numbers=123,456,789\n"
|
||||
@@ -823,7 +836,7 @@ class TestComplexScenarios:
|
||||
|
||||
def test_args_no_and_list_skip_combination(self, tmp_path: Path, capsys: CaptureFixture[str]):
|
||||
"""Test combination of args_override:yes flag and list argument skip behavior"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text(
|
||||
"[Settings]\n"
|
||||
"no_override=a,b,c\n"
|
||||
|
||||
461
tests/unit/db_handling/test_sql_main.py
Normal file
461
tests/unit/db_handling/test_sql_main.py
Normal file
@@ -0,0 +1,461 @@
|
||||
"""
|
||||
PyTest: db_handling/sql_main
|
||||
Tests for SQLMain class - Main SQL interface wrapper
|
||||
|
||||
Note: Pylance warnings about "Redefining name from outer scope" in fixtures are expected.
|
||||
This is standard pytest fixture behavior where fixture parameters shadow fixture definitions.
|
||||
"""
|
||||
# pylint: disable=redefined-outer-name,too-many-public-methods,protected-access
|
||||
# pyright: reportUnknownParameterType=false, reportUnknownArgumentType=false
|
||||
# pyright: reportMissingParameterType=false, reportUnknownVariableType=false
|
||||
# pyright: reportArgumentType=false, reportGeneralTypeIssues=false
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Generator
|
||||
from unittest.mock import MagicMock, patch
|
||||
import pytest
|
||||
from corelibs.db_handling.sql_main import SQLMain, IDENT_SPLIT_CHARACTER
|
||||
from corelibs.db_handling.sqlite_io import SQLiteIO
|
||||
|
||||
|
||||
# Test fixtures
|
||||
@pytest.fixture
|
||||
def mock_logger() -> MagicMock:
|
||||
"""Create a mock logger for testing"""
|
||||
logger = MagicMock()
|
||||
logger.debug = MagicMock()
|
||||
logger.info = MagicMock()
|
||||
logger.warning = MagicMock()
|
||||
logger.error = MagicMock()
|
||||
return logger
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def temp_db_path(tmp_path: Path) -> Path:
|
||||
"""Create a temporary database file path"""
|
||||
return tmp_path / "test_database.db"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_sqlite_io() -> Generator[MagicMock, None, None]:
|
||||
"""Create a mock SQLiteIO instance"""
|
||||
mock_io = MagicMock(spec=SQLiteIO)
|
||||
mock_io.conn = MagicMock()
|
||||
mock_io.db_connected = MagicMock(return_value=True)
|
||||
mock_io.db_close = MagicMock()
|
||||
mock_io.execute_query = MagicMock(return_value=[])
|
||||
yield mock_io
|
||||
|
||||
|
||||
# Test constant
|
||||
class TestConstants:
|
||||
"""Tests for module-level constants"""
|
||||
|
||||
def test_ident_split_character(self):
|
||||
"""Test that IDENT_SPLIT_CHARACTER is defined correctly"""
|
||||
assert IDENT_SPLIT_CHARACTER == ':'
|
||||
|
||||
|
||||
# Test SQLMain class initialization
|
||||
class TestSQLMainInit:
|
||||
"""Tests for SQLMain.__init__"""
|
||||
|
||||
@patch('corelibs.db_handling.sql_main.SQLiteIO')
|
||||
def test_successful_initialization_sqlite(
|
||||
self, mock_sqlite_class: MagicMock, mock_logger: MagicMock, temp_db_path: Path
|
||||
):
|
||||
"""Test successful initialization with SQLite"""
|
||||
mock_sqlite_instance = MagicMock()
|
||||
mock_sqlite_instance.conn = MagicMock()
|
||||
mock_sqlite_instance.db_connected = MagicMock(return_value=True)
|
||||
mock_sqlite_class.return_value = mock_sqlite_instance
|
||||
|
||||
db_ident = f'sqlite:{temp_db_path}'
|
||||
sql_main = SQLMain(mock_logger, db_ident)
|
||||
|
||||
assert sql_main.log == mock_logger
|
||||
assert sql_main.dbh == mock_sqlite_instance
|
||||
assert sql_main.db_target == 'sqlite'
|
||||
mock_sqlite_class.assert_called_once_with(mock_logger, str(temp_db_path), row_factory='Dict')
|
||||
|
||||
@patch('corelibs.db_handling.sql_main.SQLiteIO')
|
||||
def test_initialization_connection_failure(self, mock_sqlite_class: MagicMock, mock_logger: MagicMock):
|
||||
"""Test initialization fails when connection cannot be established"""
|
||||
mock_sqlite_instance = MagicMock()
|
||||
mock_sqlite_instance.conn = None
|
||||
mock_sqlite_instance.db_connected = MagicMock(return_value=False)
|
||||
mock_sqlite_class.return_value = mock_sqlite_instance
|
||||
|
||||
db_ident = 'sqlite:/path/to/db.db'
|
||||
with pytest.raises(ValueError, match='DB Connection failed for: sqlite'):
|
||||
SQLMain(mock_logger, db_ident)
|
||||
|
||||
def test_initialization_invalid_db_target(self, mock_logger: MagicMock):
|
||||
"""Test initialization with unsupported database target"""
|
||||
db_ident = 'postgresql:/path/to/db'
|
||||
with pytest.raises(ValueError, match='SQL interface for postgresql is not implemented'):
|
||||
SQLMain(mock_logger, db_ident)
|
||||
|
||||
def test_initialization_malformed_db_ident(self, mock_logger: MagicMock):
|
||||
"""Test initialization with malformed db_ident string"""
|
||||
db_ident = 'sqlite_no_colon'
|
||||
with pytest.raises(ValueError):
|
||||
SQLMain(mock_logger, db_ident)
|
||||
|
||||
|
||||
# Test SQLMain.connect method
|
||||
class TestSQLMainConnect:
|
||||
"""Tests for SQLMain.connect"""
|
||||
|
||||
@patch('corelibs.db_handling.sql_main.SQLiteIO')
|
||||
def test_connect_when_already_connected(
|
||||
self, mock_sqlite_class: MagicMock, mock_logger: MagicMock, temp_db_path: Path
|
||||
):
|
||||
"""Test connect warns when already connected"""
|
||||
mock_sqlite_instance = MagicMock()
|
||||
mock_sqlite_instance.conn = MagicMock()
|
||||
mock_sqlite_instance.db_connected = MagicMock(return_value=True)
|
||||
mock_sqlite_class.return_value = mock_sqlite_instance
|
||||
|
||||
db_ident = f'sqlite:{temp_db_path}'
|
||||
sql_main = SQLMain(mock_logger, db_ident)
|
||||
|
||||
# Reset mock to check second call
|
||||
mock_logger.warning.reset_mock()
|
||||
|
||||
# Try to connect again
|
||||
sql_main.connect(f'sqlite:{temp_db_path}')
|
||||
|
||||
# Should have warned about existing connection
|
||||
mock_logger.warning.assert_called_once()
|
||||
assert 'already exists' in str(mock_logger.warning.call_args)
|
||||
|
||||
@patch('corelibs.db_handling.sql_main.SQLiteIO')
|
||||
def test_connect_sqlite_success(
|
||||
self, mock_sqlite_class: MagicMock, mock_logger: MagicMock, temp_db_path: Path
|
||||
):
|
||||
"""Test successful SQLite connection"""
|
||||
mock_sqlite_instance = MagicMock()
|
||||
mock_sqlite_instance.conn = MagicMock()
|
||||
mock_sqlite_instance.db_connected = MagicMock(return_value=True)
|
||||
mock_sqlite_class.return_value = mock_sqlite_instance
|
||||
|
||||
sql_main = SQLMain.__new__(SQLMain)
|
||||
sql_main.log = mock_logger
|
||||
sql_main.dbh = None
|
||||
sql_main.db_target = None
|
||||
|
||||
db_ident = f'sqlite:{temp_db_path}'
|
||||
sql_main.connect(db_ident)
|
||||
|
||||
assert sql_main.db_target == 'sqlite'
|
||||
assert sql_main.dbh == mock_sqlite_instance
|
||||
mock_sqlite_class.assert_called_once_with(mock_logger, str(temp_db_path), row_factory='Dict')
|
||||
|
||||
def test_connect_unsupported_database(self, mock_logger: MagicMock):
|
||||
"""Test connect with unsupported database type"""
|
||||
sql_main = SQLMain.__new__(SQLMain)
|
||||
sql_main.log = mock_logger
|
||||
sql_main.dbh = None
|
||||
sql_main.db_target = None
|
||||
|
||||
db_ident = 'mysql:/path/to/db'
|
||||
with pytest.raises(ValueError, match='SQL interface for mysql is not implemented'):
|
||||
sql_main.connect(db_ident)
|
||||
|
||||
@patch('corelibs.db_handling.sql_main.SQLiteIO')
|
||||
def test_connect_db_connection_failed(
|
||||
self, mock_sqlite_class: MagicMock, mock_logger: MagicMock, temp_db_path: Path
|
||||
):
|
||||
"""Test connect raises error when DB connection fails"""
|
||||
mock_sqlite_instance = MagicMock()
|
||||
mock_sqlite_instance.db_connected = MagicMock(return_value=False)
|
||||
mock_sqlite_class.return_value = mock_sqlite_instance
|
||||
|
||||
sql_main = SQLMain.__new__(SQLMain)
|
||||
sql_main.log = mock_logger
|
||||
sql_main.dbh = None
|
||||
sql_main.db_target = None
|
||||
|
||||
db_ident = f'sqlite:{temp_db_path}'
|
||||
with pytest.raises(ValueError, match='DB Connection failed for: sqlite'):
|
||||
sql_main.connect(db_ident)
|
||||
|
||||
|
||||
# Test SQLMain.close method
|
||||
class TestSQLMainClose:
|
||||
"""Tests for SQLMain.close"""
|
||||
|
||||
@patch('corelibs.db_handling.sql_main.SQLiteIO')
|
||||
def test_close_successful(
|
||||
self, mock_sqlite_class: MagicMock, mock_logger: MagicMock, temp_db_path: Path
|
||||
):
|
||||
"""Test successful database close"""
|
||||
mock_sqlite_instance = MagicMock()
|
||||
mock_sqlite_instance.conn = MagicMock()
|
||||
mock_sqlite_instance.db_connected = MagicMock(return_value=True)
|
||||
mock_sqlite_instance.db_close = MagicMock()
|
||||
mock_sqlite_class.return_value = mock_sqlite_instance
|
||||
|
||||
db_ident = f'sqlite:{temp_db_path}'
|
||||
sql_main = SQLMain(mock_logger, db_ident)
|
||||
|
||||
sql_main.close()
|
||||
|
||||
mock_sqlite_instance.db_close.assert_called_once()
|
||||
|
||||
@patch('corelibs.db_handling.sql_main.SQLiteIO')
|
||||
def test_close_when_not_connected(
|
||||
self, mock_sqlite_class: MagicMock, mock_logger: MagicMock, temp_db_path: Path
|
||||
):
|
||||
"""Test close when not connected does nothing"""
|
||||
mock_sqlite_instance = MagicMock()
|
||||
mock_sqlite_instance.conn = MagicMock()
|
||||
mock_sqlite_instance.db_connected = MagicMock(return_value=True)
|
||||
mock_sqlite_instance.db_close = MagicMock()
|
||||
mock_sqlite_class.return_value = mock_sqlite_instance
|
||||
|
||||
db_ident = f'sqlite:{temp_db_path}'
|
||||
sql_main = SQLMain(mock_logger, db_ident)
|
||||
|
||||
# Change db_connected to return False to simulate disconnection
|
||||
mock_sqlite_instance.db_connected = MagicMock(return_value=False)
|
||||
|
||||
sql_main.close()
|
||||
|
||||
# Should not raise error and should exit early
|
||||
assert mock_sqlite_instance.db_close.call_count == 0
|
||||
|
||||
def test_close_when_dbh_is_none(self, mock_logger: MagicMock):
|
||||
"""Test close when dbh is None"""
|
||||
sql_main = SQLMain.__new__(SQLMain)
|
||||
sql_main.log = mock_logger
|
||||
sql_main.dbh = None
|
||||
sql_main.db_target = 'sqlite'
|
||||
|
||||
# Should not raise error
|
||||
sql_main.close()
|
||||
|
||||
|
||||
# Test SQLMain.connected method
|
||||
class TestSQLMainConnected:
|
||||
"""Tests for SQLMain.connected"""
|
||||
|
||||
@patch('corelibs.db_handling.sql_main.SQLiteIO')
|
||||
def test_connected_returns_true(
|
||||
self, mock_sqlite_class: MagicMock, mock_logger: MagicMock, temp_db_path: Path
|
||||
):
|
||||
"""Test connected returns True when connected"""
|
||||
mock_sqlite_instance = MagicMock()
|
||||
mock_sqlite_instance.conn = MagicMock()
|
||||
mock_sqlite_instance.db_connected = MagicMock(return_value=True)
|
||||
mock_sqlite_class.return_value = mock_sqlite_instance
|
||||
|
||||
db_ident = f'sqlite:{temp_db_path}'
|
||||
sql_main = SQLMain(mock_logger, db_ident)
|
||||
|
||||
assert sql_main.connected() is True
|
||||
mock_logger.warning.assert_not_called()
|
||||
|
||||
@patch('corelibs.db_handling.sql_main.SQLiteIO')
|
||||
def test_connected_returns_false_when_not_connected(
|
||||
self, mock_sqlite_class: MagicMock, mock_logger: MagicMock, temp_db_path: Path
|
||||
):
|
||||
"""Test connected returns False and warns when not connected"""
|
||||
mock_sqlite_instance = MagicMock()
|
||||
mock_sqlite_instance.conn = MagicMock()
|
||||
mock_sqlite_instance.db_connected = MagicMock(return_value=True)
|
||||
mock_sqlite_class.return_value = mock_sqlite_instance
|
||||
|
||||
db_ident = f'sqlite:{temp_db_path}'
|
||||
sql_main = SQLMain(mock_logger, db_ident)
|
||||
|
||||
# Reset warning calls from init
|
||||
mock_logger.warning.reset_mock()
|
||||
|
||||
# Change db_connected to return False to simulate disconnection
|
||||
mock_sqlite_instance.db_connected = MagicMock(return_value=False)
|
||||
|
||||
assert sql_main.connected() is False
|
||||
mock_logger.warning.assert_called_once()
|
||||
assert 'No connection' in str(mock_logger.warning.call_args)
|
||||
|
||||
def test_connected_returns_false_when_dbh_is_none(self, mock_logger: MagicMock):
|
||||
"""Test connected returns False when dbh is None"""
|
||||
sql_main = SQLMain.__new__(SQLMain)
|
||||
sql_main.log = mock_logger
|
||||
sql_main.dbh = None
|
||||
sql_main.db_target = 'sqlite'
|
||||
|
||||
assert sql_main.connected() is False
|
||||
mock_logger.warning.assert_called_once()
|
||||
|
||||
|
||||
# Test SQLMain.process_query method
|
||||
class TestSQLMainProcessQuery:
|
||||
"""Tests for SQLMain.process_query"""
|
||||
|
||||
@patch('corelibs.db_handling.sql_main.SQLiteIO')
|
||||
def test_process_query_success_no_params(
|
||||
self, mock_sqlite_class: MagicMock, mock_logger: MagicMock, temp_db_path: Path
|
||||
):
|
||||
"""Test successful query execution without parameters"""
|
||||
mock_sqlite_instance = MagicMock()
|
||||
mock_sqlite_instance.conn = MagicMock()
|
||||
mock_sqlite_instance.db_connected = MagicMock(return_value=True)
|
||||
expected_result = [{'id': 1, 'name': 'test'}]
|
||||
mock_sqlite_instance.execute_query = MagicMock(return_value=expected_result)
|
||||
mock_sqlite_class.return_value = mock_sqlite_instance
|
||||
|
||||
db_ident = f'sqlite:{temp_db_path}'
|
||||
sql_main = SQLMain(mock_logger, db_ident)
|
||||
|
||||
query = "SELECT * FROM test"
|
||||
result = sql_main.process_query(query)
|
||||
|
||||
assert result == expected_result
|
||||
mock_sqlite_instance.execute_query.assert_called_once_with(query, None)
|
||||
|
||||
@patch('corelibs.db_handling.sql_main.SQLiteIO')
|
||||
def test_process_query_success_with_params(
|
||||
self, mock_sqlite_class: MagicMock, mock_logger: MagicMock, temp_db_path: Path
|
||||
):
|
||||
"""Test successful query execution with parameters"""
|
||||
mock_sqlite_instance = MagicMock()
|
||||
mock_sqlite_instance.conn = MagicMock()
|
||||
mock_sqlite_instance.db_connected = MagicMock(return_value=True)
|
||||
expected_result = [{'id': 1, 'name': 'test'}]
|
||||
mock_sqlite_instance.execute_query = MagicMock(return_value=expected_result)
|
||||
mock_sqlite_class.return_value = mock_sqlite_instance
|
||||
|
||||
db_ident = f'sqlite:{temp_db_path}'
|
||||
sql_main = SQLMain(mock_logger, db_ident)
|
||||
|
||||
query = "SELECT * FROM test WHERE id = ?"
|
||||
params = (1,)
|
||||
result = sql_main.process_query(query, params)
|
||||
|
||||
assert result == expected_result
|
||||
mock_sqlite_instance.execute_query.assert_called_once_with(query, params)
|
||||
|
||||
@patch('corelibs.db_handling.sql_main.SQLiteIO')
|
||||
def test_process_query_returns_false_on_error(
|
||||
self, mock_sqlite_class: MagicMock, mock_logger: MagicMock, temp_db_path: Path
|
||||
):
|
||||
"""Test query returns False when execute_query fails"""
|
||||
mock_sqlite_instance = MagicMock()
|
||||
mock_sqlite_instance.conn = MagicMock()
|
||||
mock_sqlite_instance.db_connected = MagicMock(return_value=True)
|
||||
mock_sqlite_instance.execute_query = MagicMock(return_value=False)
|
||||
mock_sqlite_class.return_value = mock_sqlite_instance
|
||||
|
||||
db_ident = f'sqlite:{temp_db_path}'
|
||||
sql_main = SQLMain(mock_logger, db_ident)
|
||||
|
||||
query = "SELECT * FROM nonexistent"
|
||||
result = sql_main.process_query(query)
|
||||
|
||||
assert result is False
|
||||
|
||||
@patch('corelibs.db_handling.sql_main.SQLiteIO')
|
||||
def test_process_query_dbh_is_none(
|
||||
self, mock_sqlite_class: MagicMock, mock_logger: MagicMock, temp_db_path: Path
|
||||
):
|
||||
"""Test query returns False when dbh is None"""
|
||||
mock_sqlite_instance = MagicMock()
|
||||
mock_sqlite_instance.conn = MagicMock()
|
||||
mock_sqlite_instance.db_connected = MagicMock(return_value=True)
|
||||
mock_sqlite_class.return_value = mock_sqlite_instance
|
||||
|
||||
db_ident = f'sqlite:{temp_db_path}'
|
||||
sql_main = SQLMain(mock_logger, db_ident)
|
||||
|
||||
# Manually set dbh to None
|
||||
sql_main.dbh = None
|
||||
|
||||
query = "SELECT * FROM test"
|
||||
result = sql_main.process_query(query)
|
||||
|
||||
assert result is False
|
||||
mock_logger.error.assert_called_once()
|
||||
assert 'Problem connecting to db' in str(mock_logger.error.call_args)
|
||||
|
||||
@patch('corelibs.db_handling.sql_main.SQLiteIO')
|
||||
def test_process_query_returns_empty_list(
|
||||
self, mock_sqlite_class: MagicMock, mock_logger: MagicMock, temp_db_path: Path
|
||||
):
|
||||
"""Test query returns empty list when no results"""
|
||||
mock_sqlite_instance = MagicMock()
|
||||
mock_sqlite_instance.conn = MagicMock()
|
||||
mock_sqlite_instance.db_connected = MagicMock(return_value=True)
|
||||
mock_sqlite_instance.execute_query = MagicMock(return_value=[])
|
||||
mock_sqlite_class.return_value = mock_sqlite_instance
|
||||
|
||||
db_ident = f'sqlite:{temp_db_path}'
|
||||
sql_main = SQLMain(mock_logger, db_ident)
|
||||
|
||||
query = "SELECT * FROM test WHERE 1=0"
|
||||
result = sql_main.process_query(query)
|
||||
|
||||
assert result == []
|
||||
|
||||
|
||||
# Integration-like tests
|
||||
class TestSQLMainIntegration:
|
||||
"""Integration-like tests for complete workflows"""
|
||||
|
||||
@patch('corelibs.db_handling.sql_main.SQLiteIO')
|
||||
def test_full_workflow_connect_query_close(
|
||||
self, mock_sqlite_class: MagicMock, mock_logger: MagicMock, temp_db_path: Path
|
||||
):
|
||||
"""Test complete workflow: connect, query, close"""
|
||||
mock_sqlite_instance = MagicMock()
|
||||
mock_sqlite_instance.conn = MagicMock()
|
||||
mock_sqlite_instance.db_connected = MagicMock(return_value=True)
|
||||
mock_sqlite_instance.execute_query = MagicMock(return_value=[{'count': 5}])
|
||||
mock_sqlite_instance.db_close = MagicMock()
|
||||
mock_sqlite_class.return_value = mock_sqlite_instance
|
||||
|
||||
db_ident = f'sqlite:{temp_db_path}'
|
||||
sql_main = SQLMain(mock_logger, db_ident)
|
||||
|
||||
# Execute query
|
||||
result = sql_main.process_query("SELECT COUNT(*) as count FROM test")
|
||||
assert result == [{'count': 5}]
|
||||
|
||||
# Check connected
|
||||
assert sql_main.connected() is True
|
||||
|
||||
# Close connection
|
||||
sql_main.close()
|
||||
mock_sqlite_instance.db_close.assert_called_once()
|
||||
|
||||
@patch('corelibs.db_handling.sql_main.SQLiteIO')
|
||||
def test_multiple_queries_same_connection(
|
||||
self, mock_sqlite_class: MagicMock, mock_logger: MagicMock, temp_db_path: Path
|
||||
):
|
||||
"""Test multiple queries on the same connection"""
|
||||
mock_sqlite_instance = MagicMock()
|
||||
mock_sqlite_instance.conn = MagicMock()
|
||||
mock_sqlite_instance.db_connected = MagicMock(return_value=True)
|
||||
mock_sqlite_instance.execute_query = MagicMock(side_effect=[
|
||||
[{'id': 1}],
|
||||
[{'id': 2}],
|
||||
[{'id': 3}]
|
||||
])
|
||||
mock_sqlite_class.return_value = mock_sqlite_instance
|
||||
|
||||
db_ident = f'sqlite:{temp_db_path}'
|
||||
sql_main = SQLMain(mock_logger, db_ident)
|
||||
|
||||
result1 = sql_main.process_query("SELECT * FROM test WHERE id = 1")
|
||||
result2 = sql_main.process_query("SELECT * FROM test WHERE id = 2")
|
||||
result3 = sql_main.process_query("SELECT * FROM test WHERE id = 3")
|
||||
|
||||
assert result1 == [{'id': 1}]
|
||||
assert result2 == [{'id': 2}]
|
||||
assert result3 == [{'id': 3}]
|
||||
assert mock_sqlite_instance.execute_query.call_count == 3
|
||||
|
||||
|
||||
# __END__
|
||||
@@ -4,7 +4,101 @@ tests for corelibs.iterator_handling.fingerprint
|
||||
|
||||
from typing import Any
|
||||
import pytest
|
||||
from corelibs.iterator_handling.fingerprint import dict_hash_frozen, dict_hash_crc
|
||||
from corelibs.iterator_handling.fingerprint import dict_hash_frozen, dict_hash_crc, hash_object
|
||||
|
||||
|
||||
class TestHashObject:
|
||||
"""Tests for hash_object function"""
|
||||
|
||||
def test_hash_object_simple_dict(self):
|
||||
"""Test hashing a simple dictionary with hash_object"""
|
||||
data = {"key1": "value1", "key2": "value2"}
|
||||
result = hash_object(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert len(result) == 64 # SHA256 produces 64 hex characters
|
||||
|
||||
def test_hash_object_mixed_keys(self):
|
||||
"""Test hash_object with mixed int and string keys"""
|
||||
data = {"key1": "value1", 1: "value2", 2: "value3"}
|
||||
result = hash_object(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert len(result) == 64
|
||||
|
||||
def test_hash_object_consistency(self):
|
||||
"""Test that hash_object produces consistent results"""
|
||||
data = {"str_key": "value", 123: "number_key"}
|
||||
hash1 = hash_object(data)
|
||||
hash2 = hash_object(data)
|
||||
|
||||
assert hash1 == hash2
|
||||
|
||||
def test_hash_object_order_independence(self):
|
||||
"""Test that hash_object is order-independent"""
|
||||
data1 = {"a": 1, 1: "one", "b": 2, 2: "two"}
|
||||
data2 = {2: "two", "b": 2, 1: "one", "a": 1}
|
||||
hash1 = hash_object(data1)
|
||||
hash2 = hash_object(data2)
|
||||
|
||||
assert hash1 == hash2
|
||||
|
||||
def test_hash_object_list_of_dicts_mixed_keys(self):
|
||||
"""Test hash_object with list of dicts containing mixed keys"""
|
||||
data = [
|
||||
{"name": "item1", 1: "value1"},
|
||||
{"name": "item2", 2: "value2"}
|
||||
]
|
||||
result = hash_object(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert len(result) == 64
|
||||
|
||||
def test_hash_object_nested_mixed_keys(self):
|
||||
"""Test hash_object with nested structures containing mixed keys"""
|
||||
data = {
|
||||
"outer": {
|
||||
"inner": "value",
|
||||
1: "mixed_key"
|
||||
},
|
||||
2: "another_mixed"
|
||||
}
|
||||
result = hash_object(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert len(result) == 64
|
||||
|
||||
def test_hash_object_different_data(self):
|
||||
"""Test that different data produces different hashes"""
|
||||
data1 = {"key": "value", 1: "one"}
|
||||
data2 = {"key": "value", 2: "two"}
|
||||
hash1 = hash_object(data1)
|
||||
hash2 = hash_object(data2)
|
||||
|
||||
assert hash1 != hash2
|
||||
|
||||
def test_hash_object_complex_nested(self):
|
||||
"""Test hash_object with complex nested structures"""
|
||||
data = {
|
||||
"level1": {
|
||||
"level2": {
|
||||
1: "value",
|
||||
"key": [1, 2, {"nested": "deep", 3: "int_key"}]
|
||||
}
|
||||
}
|
||||
}
|
||||
result = hash_object(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert len(result) == 64
|
||||
|
||||
def test_hash_object_list_with_tuples(self):
|
||||
"""Test hash_object with lists containing tuples"""
|
||||
data = [("a", 1), ("b", 2), {1: "mixed", "key": "value"}]
|
||||
result = hash_object(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert len(result) == 64
|
||||
|
||||
|
||||
class TestDictHashFrozen:
|
||||
@@ -279,6 +373,116 @@ class TestDictHashCrc:
|
||||
assert isinstance(result, str)
|
||||
assert len(result) == 64
|
||||
|
||||
def test_dict_hash_crc_fallback_mixed_keys(self):
|
||||
"""Test dict_hash_crc fallback with mixed int and string keys"""
|
||||
data = {"key1": "value1", 1: "value2", 2: "value3"}
|
||||
result = dict_hash_crc(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
# Fallback prefixes with "HO_"
|
||||
assert result.startswith("HO_")
|
||||
# Hash should be 64 chars + 3 char prefix = 67 total
|
||||
assert len(result) == 67
|
||||
|
||||
def test_dict_hash_crc_fallback_consistency(self):
|
||||
"""Test that fallback produces consistent hashes"""
|
||||
data = {"str_key": "value", 123: "number_key", 456: "another"}
|
||||
hash1 = dict_hash_crc(data)
|
||||
hash2 = dict_hash_crc(data)
|
||||
|
||||
assert hash1 == hash2
|
||||
assert hash1.startswith("HO_")
|
||||
|
||||
def test_dict_hash_crc_fallback_order_independence(self):
|
||||
"""Test that fallback is order-independent for mixed-key dicts"""
|
||||
data1 = {"a": 1, 1: "one", "b": 2, 2: "two"}
|
||||
data2 = {2: "two", "b": 2, 1: "one", "a": 1}
|
||||
hash1 = dict_hash_crc(data1)
|
||||
hash2 = dict_hash_crc(data2)
|
||||
|
||||
assert hash1 == hash2
|
||||
assert hash1.startswith("HO_")
|
||||
|
||||
def test_dict_hash_crc_fallback_list_of_dicts_mixed_keys(self):
|
||||
"""Test fallback with list of dicts containing mixed keys"""
|
||||
data = [
|
||||
{"name": "item1", 1: "value1"},
|
||||
{"name": "item2", 2: "value2"},
|
||||
{3: "value3", "type": "mixed"}
|
||||
]
|
||||
result = dict_hash_crc(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert result.startswith("HO_")
|
||||
assert len(result) == 67
|
||||
|
||||
def test_dict_hash_crc_fallback_nested_mixed_keys(self):
|
||||
"""Test fallback with nested dicts containing mixed keys"""
|
||||
data = {
|
||||
"outer": {
|
||||
"inner": "value",
|
||||
1: "mixed_key"
|
||||
},
|
||||
2: "another_mixed"
|
||||
}
|
||||
result = dict_hash_crc(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert result.startswith("HO_")
|
||||
assert len(result) == 67
|
||||
|
||||
def test_dict_hash_crc_fallback_different_data(self):
|
||||
"""Test that different mixed-key data produces different hashes"""
|
||||
data1 = {"key": "value", 1: "one"}
|
||||
data2 = {"key": "value", 2: "two"}
|
||||
hash1 = dict_hash_crc(data1)
|
||||
hash2 = dict_hash_crc(data2)
|
||||
|
||||
assert hash1 != hash2
|
||||
assert hash1.startswith("HO_")
|
||||
assert hash2.startswith("HO_")
|
||||
|
||||
def test_dict_hash_crc_fallback_complex_structure(self):
|
||||
"""Test fallback with complex nested structure with mixed keys"""
|
||||
data = [
|
||||
{
|
||||
"id": 1,
|
||||
1: "first",
|
||||
"data": {
|
||||
"nested": "value",
|
||||
100: "nested_int_key"
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
2: "second",
|
||||
"items": [1, 2, 3]
|
||||
}
|
||||
]
|
||||
result = dict_hash_crc(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert result.startswith("HO_")
|
||||
assert len(result) == 67
|
||||
|
||||
def test_dict_hash_crc_no_fallback_string_keys_only(self):
|
||||
"""Test that string-only keys don't trigger fallback"""
|
||||
data = {"key1": "value1", "key2": "value2", "key3": "value3"}
|
||||
result = dict_hash_crc(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert not result.startswith("HO_")
|
||||
assert len(result) == 64
|
||||
|
||||
def test_dict_hash_crc_no_fallback_int_keys_only(self):
|
||||
"""Test that int-only keys don't trigger fallback"""
|
||||
data = {1: "one", 2: "two", 3: "three"}
|
||||
result = dict_hash_crc(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert not result.startswith("HO_")
|
||||
assert len(result) == 64
|
||||
|
||||
|
||||
class TestComparisonBetweenHashFunctions:
|
||||
"""Tests comparing dict_hash_frozen and dict_hash_crc"""
|
||||
|
||||
@@ -4,7 +4,7 @@ iterator_handling.list_helepr tests
|
||||
|
||||
from typing import Any
|
||||
import pytest
|
||||
from corelibs.iterator_handling.list_helpers import convert_to_list, is_list_in_list
|
||||
from corelibs.iterator_handling.list_helpers import convert_to_list, is_list_in_list, make_unique_list_of_dicts
|
||||
|
||||
|
||||
class TestConvertToList:
|
||||
@@ -298,3 +298,225 @@ class TestPerformance:
|
||||
# Should still work correctly despite duplicates
|
||||
assert set(result) == {1, 3}
|
||||
assert isinstance(result, list)
|
||||
|
||||
|
||||
class TestMakeUniqueListOfDicts:
|
||||
"""Test cases for make_unique_list_of_dicts function"""
|
||||
|
||||
def test_basic_duplicate_removal(self):
|
||||
"""Test basic removal of duplicate dictionaries"""
|
||||
dict_list = [
|
||||
{"a": 1, "b": 2},
|
||||
{"a": 1, "b": 2},
|
||||
{"a": 3, "b": 4}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 2
|
||||
assert {"a": 1, "b": 2} in result
|
||||
assert {"a": 3, "b": 4} in result
|
||||
|
||||
def test_order_independent_duplicates(self):
|
||||
"""Test that dictionaries with different key orders are treated as duplicates"""
|
||||
dict_list = [
|
||||
{"a": 1, "b": 2},
|
||||
{"b": 2, "a": 1}, # Same content, different order
|
||||
{"a": 3, "b": 4}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 2
|
||||
assert {"a": 1, "b": 2} in result
|
||||
assert {"a": 3, "b": 4} in result
|
||||
|
||||
def test_empty_list(self):
|
||||
"""Test with empty list"""
|
||||
result = make_unique_list_of_dicts([])
|
||||
assert result == []
|
||||
assert isinstance(result, list)
|
||||
|
||||
def test_single_dict(self):
|
||||
"""Test with single dictionary"""
|
||||
dict_list = [{"a": 1, "b": 2}]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert result == [{"a": 1, "b": 2}]
|
||||
|
||||
def test_all_unique(self):
|
||||
"""Test when all dictionaries are unique"""
|
||||
dict_list = [
|
||||
{"a": 1},
|
||||
{"b": 2},
|
||||
{"c": 3},
|
||||
{"d": 4}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 4
|
||||
for d in dict_list:
|
||||
assert d in result
|
||||
|
||||
def test_all_duplicates(self):
|
||||
"""Test when all dictionaries are duplicates"""
|
||||
dict_list = [
|
||||
{"a": 1, "b": 2},
|
||||
{"a": 1, "b": 2},
|
||||
{"a": 1, "b": 2},
|
||||
{"b": 2, "a": 1}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 1
|
||||
assert result[0] == {"a": 1, "b": 2}
|
||||
|
||||
def test_nested_values(self):
|
||||
"""Test with nested structures as values"""
|
||||
dict_list = [
|
||||
{"a": [1, 2], "b": 3},
|
||||
{"a": [1, 2], "b": 3},
|
||||
{"a": [1, 3], "b": 3}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 2
|
||||
assert {"a": [1, 2], "b": 3} in result
|
||||
assert {"a": [1, 3], "b": 3} in result
|
||||
|
||||
def test_different_value_types(self):
|
||||
"""Test with different value types"""
|
||||
dict_list = [
|
||||
{"str": "hello", "int": 42, "float": 3.14, "bool": True},
|
||||
{"str": "hello", "int": 42, "float": 3.14, "bool": True},
|
||||
{"str": "world", "int": 99, "float": 2.71, "bool": False}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 2
|
||||
|
||||
def test_empty_dicts(self):
|
||||
"""Test with empty dictionaries"""
|
||||
dict_list: list[Any] = [
|
||||
{},
|
||||
{},
|
||||
{"a": 1}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 2
|
||||
assert {} in result
|
||||
assert {"a": 1} in result
|
||||
|
||||
def test_single_key_dicts(self):
|
||||
"""Test with single key dictionaries"""
|
||||
dict_list = [
|
||||
{"a": 1},
|
||||
{"a": 1},
|
||||
{"a": 2},
|
||||
{"b": 1}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 3
|
||||
assert {"a": 1} in result
|
||||
assert {"a": 2} in result
|
||||
assert {"b": 1} in result
|
||||
|
||||
def test_many_keys(self):
|
||||
"""Test with dictionaries containing many keys"""
|
||||
dict1 = {"a": 1, "b": 2, "c": 3, "d": 4, "e": 5}
|
||||
dict2 = {"e": 5, "d": 4, "c": 3, "b": 2, "a": 1} # Same, different order
|
||||
dict3 = {"a": 1, "b": 2, "c": 3, "d": 4, "e": 6} # Different value
|
||||
dict_list = [dict1, dict2, dict3]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 2
|
||||
|
||||
def test_numeric_keys(self):
|
||||
"""Test with numeric keys"""
|
||||
dict_list = [
|
||||
{1: "one", 2: "two"},
|
||||
{2: "two", 1: "one"},
|
||||
{1: "one", 2: "three"}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 2
|
||||
|
||||
def test_none_values(self):
|
||||
"""Test with None values"""
|
||||
dict_list = [
|
||||
{"a": None, "b": 2},
|
||||
{"a": None, "b": 2},
|
||||
{"a": 1, "b": None}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 2
|
||||
assert {"a": None, "b": 2} in result
|
||||
assert {"a": 1, "b": None} in result
|
||||
|
||||
def test_mixed_key_types(self):
|
||||
"""Test with mixed key types (string and numeric)"""
|
||||
dict_list = [
|
||||
{"a": 1, 1: "one"},
|
||||
{1: "one", "a": 1},
|
||||
{"a": 2, 1: "one"}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 2
|
||||
|
||||
@pytest.mark.parametrize("dict_list,expected_length", [
|
||||
([{"a": 1}, {"a": 1}, {"a": 1}], 1),
|
||||
([{"a": 1}, {"a": 2}, {"a": 3}], 3),
|
||||
([{"a": 1, "b": 2}, {"b": 2, "a": 1}], 1),
|
||||
([{}, {}], 1),
|
||||
([{"x": [1, 2]}, {"x": [1, 2]}], 1),
|
||||
([{"a": 1}, {"b": 2}, {"c": 3}], 3),
|
||||
]) # pyright: ignore[reportUnknownArgumentType]
|
||||
def test_parametrized_unique_dicts(self, dict_list: list[Any], expected_length: int):
|
||||
"""Test make_unique_list_of_dicts with various input combinations"""
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == expected_length
|
||||
assert isinstance(result, list)
|
||||
|
||||
def test_large_list(self):
|
||||
"""Test with a large list of dictionaries"""
|
||||
dict_list = [{"id": i % 100, "value": f"val_{i % 100}"} for i in range(1000)]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
# Should have 100 unique dicts (0-99)
|
||||
assert len(result) == 100
|
||||
|
||||
def test_preserves_last_occurrence(self):
|
||||
"""Test behavior with duplicate entries"""
|
||||
# The function uses dict comprehension, which keeps last occurrence
|
||||
dict_list = [
|
||||
{"a": 1, "b": 2},
|
||||
{"a": 3, "b": 4},
|
||||
{"a": 1, "b": 2}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 2
|
||||
# Just verify correct unique count, order may vary
|
||||
|
||||
def test_nested_dicts(self):
|
||||
"""Test with nested dictionaries"""
|
||||
dict_list = [
|
||||
{"outer": {"inner": 1}},
|
||||
{"outer": {"inner": 1}},
|
||||
{"outer": {"inner": 2}}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 2
|
||||
|
||||
def test_string_values_case_sensitive(self):
|
||||
"""Test that string values are case-sensitive"""
|
||||
dict_list = [
|
||||
{"name": "John"},
|
||||
{"name": "john"},
|
||||
{"name": "JOHN"},
|
||||
{"name": "John"}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 3
|
||||
|
||||
def test_boolean_values(self):
|
||||
"""Test with boolean values"""
|
||||
dict_list = [
|
||||
{"flag": True, "count": 1},
|
||||
{"count": 1, "flag": True},
|
||||
{"flag": False, "count": 1}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 2
|
||||
assert {"flag": True, "count": 1} in result
|
||||
assert {"flag": False, "count": 1} in result
|
||||
|
||||
# __END__
|
||||
|
||||
@@ -28,6 +28,7 @@ def tmp_log_path(tmp_path: Path) -> Path:
|
||||
@pytest.fixture
|
||||
def basic_log_settings() -> LogSettings:
|
||||
"""Basic log settings for testing"""
|
||||
# Return a new dict each time to avoid state pollution
|
||||
return {
|
||||
"log_level_console": LoggingLevel.WARNING,
|
||||
"log_level_file": LoggingLevel.DEBUG,
|
||||
@@ -308,4 +309,54 @@ class TestUpdateConsoleFormatter:
|
||||
# Verify message was logged
|
||||
assert "Test warning message" in caplog.text
|
||||
|
||||
def test_log_console_format_option_set_to_none(
|
||||
self, tmp_log_path: Path
|
||||
):
|
||||
"""Test that when log_console_format option is set to None, it uses ConsoleFormatSettings.ALL"""
|
||||
# Save the original DEFAULT_LOG_SETTINGS to restore it after test
|
||||
original_default = Log.DEFAULT_LOG_SETTINGS.copy()
|
||||
|
||||
try:
|
||||
# Reset DEFAULT_LOG_SETTINGS to ensure clean state
|
||||
Log.DEFAULT_LOG_SETTINGS = {
|
||||
"log_level_console": Log.DEFAULT_LOG_LEVEL_CONSOLE,
|
||||
"log_level_file": Log.DEFAULT_LOG_LEVEL_FILE,
|
||||
"per_run_log": False,
|
||||
"console_enabled": True,
|
||||
"console_color_output_enabled": True,
|
||||
"console_format_type": ConsoleFormatSettings.ALL,
|
||||
"add_start_info": True,
|
||||
"add_end_info": False,
|
||||
"log_queue": None,
|
||||
}
|
||||
|
||||
# Create a fresh settings dict with console_format_type explicitly set to None
|
||||
settings: LogSettings = {
|
||||
"log_level_console": LoggingLevel.WARNING,
|
||||
"log_level_file": LoggingLevel.DEBUG,
|
||||
"per_run_log": False,
|
||||
"console_enabled": True,
|
||||
"console_color_output_enabled": False,
|
||||
"console_format_type": None, # type: ignore
|
||||
"add_start_info": False,
|
||||
"add_end_info": False,
|
||||
"log_queue": None,
|
||||
}
|
||||
|
||||
# Verify that None is explicitly set in the input
|
||||
assert settings['console_format_type'] is None
|
||||
|
||||
log = Log(
|
||||
log_path=tmp_log_path,
|
||||
log_name="test_log",
|
||||
log_settings=settings
|
||||
)
|
||||
|
||||
# Verify that None was replaced with ConsoleFormatSettings.ALL
|
||||
# The Log class should replace None with the default value (ALL)
|
||||
assert log.log_settings['console_format_type'] == ConsoleFormatSettings.ALL
|
||||
finally:
|
||||
# Restore original DEFAULT_LOG_SETTINGS
|
||||
Log.DEFAULT_LOG_SETTINGS = original_default
|
||||
|
||||
# __END__
|
||||
|
||||
0
tests/unit/math_handling/__init__.py
Normal file
0
tests/unit/math_handling/__init__.py
Normal file
121
tests/unit/math_handling/test_math_helpers.py
Normal file
121
tests/unit/math_handling/test_math_helpers.py
Normal file
@@ -0,0 +1,121 @@
|
||||
"""
|
||||
Unit tests for math_helpers module
|
||||
"""
|
||||
|
||||
from corelibs.math_handling.math_helpers import gcd, lcd
|
||||
|
||||
|
||||
class TestGcd:
|
||||
"""Test cases for the gcd (Greatest Common Divisor) function"""
|
||||
|
||||
def test_gcd_basic_positive_numbers(self):
|
||||
"""Test GCD with basic positive numbers"""
|
||||
assert gcd(12, 8) == 4
|
||||
assert gcd(15, 10) == 5
|
||||
assert gcd(21, 14) == 7
|
||||
|
||||
def test_gcd_coprime_numbers(self):
|
||||
"""Test GCD with coprime numbers (GCD should be 1)"""
|
||||
assert gcd(13, 7) == 1
|
||||
assert gcd(17, 19) == 1
|
||||
assert gcd(25, 49) == 1
|
||||
|
||||
def test_gcd_same_numbers(self):
|
||||
"""Test GCD with same numbers"""
|
||||
assert gcd(5, 5) == 5
|
||||
assert gcd(100, 100) == 100
|
||||
|
||||
def test_gcd_with_zero(self):
|
||||
"""Test GCD when one or both numbers are zero"""
|
||||
assert gcd(0, 5) == 5
|
||||
assert gcd(5, 0) == 5
|
||||
assert gcd(0, 0) == 0
|
||||
|
||||
def test_gcd_with_one(self):
|
||||
"""Test GCD when one number is 1"""
|
||||
assert gcd(1, 5) == 1
|
||||
assert gcd(100, 1) == 1
|
||||
|
||||
def test_gcd_large_numbers(self):
|
||||
"""Test GCD with large numbers"""
|
||||
assert gcd(1000000, 500000) == 500000
|
||||
assert gcd(123456, 789012) == 12
|
||||
|
||||
def test_gcd_reversed_order(self):
|
||||
"""Test GCD is commutative (order doesn't matter)"""
|
||||
assert gcd(12, 8) == gcd(8, 12)
|
||||
assert gcd(100, 35) == gcd(35, 100)
|
||||
|
||||
def test_gcd_negative_numbers(self):
|
||||
"""Test GCD with negative numbers"""
|
||||
assert gcd(-12, 8) == 4
|
||||
assert gcd(12, -8) == 4
|
||||
assert gcd(-12, -8) == 4
|
||||
|
||||
def test_gcd_multiples(self):
|
||||
"""Test GCD when one number is a multiple of the other"""
|
||||
assert gcd(10, 5) == 5
|
||||
assert gcd(100, 25) == 25
|
||||
assert gcd(7, 21) == 7
|
||||
|
||||
|
||||
class TestLcd:
|
||||
"""Test cases for the lcd (Least Common Denominator/Multiple) function"""
|
||||
|
||||
def test_lcd_basic_positive_numbers(self):
|
||||
"""Test LCD with basic positive numbers"""
|
||||
assert lcd(4, 6) == 12
|
||||
assert lcd(3, 5) == 15
|
||||
assert lcd(12, 8) == 24
|
||||
|
||||
def test_lcd_coprime_numbers(self):
|
||||
"""Test LCD with coprime numbers (should be their product)"""
|
||||
assert lcd(7, 13) == 91
|
||||
assert lcd(11, 13) == 143
|
||||
assert lcd(5, 7) == 35
|
||||
|
||||
def test_lcd_same_numbers(self):
|
||||
"""Test LCD with same numbers"""
|
||||
assert lcd(5, 5) == 5
|
||||
assert lcd(100, 100) == 100
|
||||
|
||||
def test_lcd_with_one(self):
|
||||
"""Test LCD when one number is 1"""
|
||||
assert lcd(1, 5) == 5
|
||||
assert lcd(100, 1) == 100
|
||||
|
||||
def test_lcd_with_zero(self):
|
||||
"""Test LCD when one or both numbers are zero"""
|
||||
assert lcd(0, 5) == 0
|
||||
assert lcd(5, 0) == 0
|
||||
assert lcd(0, 0) == 0
|
||||
|
||||
def test_lcd_large_numbers(self):
|
||||
"""Test LCD with large numbers"""
|
||||
assert lcd(100, 150) == 300
|
||||
assert lcd(1000, 500) == 1000
|
||||
|
||||
def test_lcd_reversed_order(self):
|
||||
"""Test LCD is commutative (order doesn't matter)"""
|
||||
assert lcd(4, 6) == lcd(6, 4)
|
||||
assert lcd(12, 18) == lcd(18, 12)
|
||||
|
||||
def test_lcd_negative_numbers(self):
|
||||
"""Test LCD with negative numbers"""
|
||||
assert lcd(-4, 6) == 12
|
||||
assert lcd(4, -6) == 12
|
||||
assert lcd(-4, -6) == 12
|
||||
|
||||
def test_lcd_multiples(self):
|
||||
"""Test LCD when one number is a multiple of the other"""
|
||||
assert lcd(5, 10) == 10
|
||||
assert lcd(3, 9) == 9
|
||||
assert lcd(25, 100) == 100
|
||||
|
||||
def test_lcd_gcd_relationship(self):
|
||||
"""Test the mathematical relationship between LCD and GCD: lcd(a,b) * gcd(a,b) = a * b"""
|
||||
test_cases = [(12, 8), (15, 10), (21, 14), (100, 35)]
|
||||
for a, b in test_cases:
|
||||
assert lcd(a, b) * gcd(a, b) == a * b
|
||||
|
||||
# __END__
|
||||
Reference in New Issue
Block a user