Compare commits
25 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d098eb58f3 | ||
|
|
5319a059ad | ||
|
|
163b8c4018 | ||
|
|
6322b95068 | ||
|
|
715ed1f9c2 | ||
|
|
82a759dd21 | ||
|
|
fe913608c4 | ||
|
|
79f9c5d1c6 | ||
|
|
3d091129e2 | ||
|
|
1a978f786d | ||
|
|
51669d3c5f | ||
|
|
d128dcb479 | ||
|
|
84286593f6 | ||
|
|
8d97f09e5e | ||
|
|
2748bc19be | ||
|
|
0b3c8fc774 | ||
|
|
7da18e0f00 | ||
|
|
49e38081ad | ||
|
|
a14f993a31 | ||
|
|
ae938f9909 | ||
|
|
f91e0bb93a | ||
|
|
d3f61005cf | ||
|
|
2923a3e88b | ||
|
|
a73ced0067 | ||
|
|
f89b91fe7f |
@@ -1,7 +1,7 @@
|
||||
# MARK: Project info
|
||||
[project]
|
||||
name = "corelibs"
|
||||
version = "0.43.2"
|
||||
version = "0.48.0"
|
||||
description = "Collection of utils for Python scripts"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.13"
|
||||
|
||||
@@ -19,9 +19,26 @@ def compile_re(reg: str) -> re.Pattern[str]:
|
||||
|
||||
|
||||
# email regex
|
||||
EMAIL_BASIC_REGEX: str = r"""
|
||||
^[A-Za-z0-9!#$%&'*+\-\/=?^_`{|}~][A-Za-z0-9!#$%:\(\)&'*+\-\/=?^_`{|}~\.]{0,63}
|
||||
@(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[a-zA-Z]{2,6}$
|
||||
SUB_EMAIL_BASIC_REGEX: str = r"""
|
||||
[A-Za-z0-9!#$%&'*+\-\/=?^_`{|}~][A-Za-z0-9!#$%:\(\)&'*+\-\/=?^_`{|}~\.]{0,63}
|
||||
@(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[a-zA-Z]{2,6}
|
||||
"""
|
||||
EMAIL_BASIC_REGEX = rf"^{SUB_EMAIL_BASIC_REGEX}$"
|
||||
# name + email regex for email sending type like "foo bar" <email@mail.com>
|
||||
NAME_EMAIL_SIMPLE_REGEX = r"""
|
||||
^\s*(?:"(?P<name1>[^"]+)"\s*<(?P<email1>[^>]+)>|
|
||||
(?P<name2>.+?)\s*<(?P<email2>[^>]+)>|
|
||||
<(?P<email3>[^>]+)>|
|
||||
(?P<email4>[^\s<>]+))\s*$
|
||||
"""
|
||||
# name + email with the basic regex set
|
||||
NAME_EMAIL_BASIC_REGEX = rf"""
|
||||
^\s*(?:
|
||||
"(?P<name1>[^"]+)"\s*<(?P<email1>{SUB_EMAIL_BASIC_REGEX})>|
|
||||
(?P<name2>.+?)\s*<(?P<email2>{SUB_EMAIL_BASIC_REGEX})>|
|
||||
<(?P<email3>{SUB_EMAIL_BASIC_REGEX})>|
|
||||
(?P<email4>{SUB_EMAIL_BASIC_REGEX})
|
||||
)\s*$
|
||||
"""
|
||||
# Domain regex with localhost
|
||||
DOMAIN_WITH_LOCALHOST_REGEX: str = r"""
|
||||
|
||||
23
src/corelibs/check_handling/regex_constants_compiled.py
Normal file
23
src/corelibs/check_handling/regex_constants_compiled.py
Normal file
@@ -0,0 +1,23 @@
|
||||
"""
|
||||
List of regex compiled strings that can be used
|
||||
"""
|
||||
|
||||
from corelibs.check_handling.regex_constants import (
|
||||
compile_re,
|
||||
EMAIL_BASIC_REGEX,
|
||||
NAME_EMAIL_SIMPLE_REGEX,
|
||||
NAME_EMAIL_BASIC_REGEX,
|
||||
DOMAIN_WITH_LOCALHOST_REGEX,
|
||||
DOMAIN_WITH_LOCALHOST_PORT_REGEX,
|
||||
DOMAIN_REGEX
|
||||
)
|
||||
|
||||
# all above in compiled form
|
||||
COMPILED_EMAIL_BASIC_REGEX = compile_re(EMAIL_BASIC_REGEX)
|
||||
COMPILED_NAME_EMAIL_SIMPLE_REGEX = compile_re(NAME_EMAIL_SIMPLE_REGEX)
|
||||
COMPILED_NAME_EMAIL_BASIC_REGEX = compile_re(NAME_EMAIL_BASIC_REGEX)
|
||||
COMPILED_DOMAIN_WITH_LOCALHOST_REGEX = compile_re(DOMAIN_WITH_LOCALHOST_REGEX)
|
||||
COMPILED_DOMAIN_WITH_LOCALHOST_PORT_REGEX = compile_re(DOMAIN_WITH_LOCALHOST_PORT_REGEX)
|
||||
COMPILED_DOMAIN_REGEX = compile_re(DOMAIN_REGEX)
|
||||
|
||||
# __END__
|
||||
@@ -173,10 +173,13 @@ class SettingsLoader:
|
||||
args_overrride.append(key)
|
||||
if skip:
|
||||
continue
|
||||
settings[config_id][key] = [
|
||||
__value.replace(" ", "")
|
||||
for __value in settings[config_id][key].split(split_char)
|
||||
]
|
||||
if settings[config_id][key]:
|
||||
settings[config_id][key] = [
|
||||
__value.replace(" ", "")
|
||||
for __value in settings[config_id][key].split(split_char)
|
||||
]
|
||||
else:
|
||||
settings[config_id][key] = []
|
||||
except KeyError as e:
|
||||
raise ValueError(self.__print(
|
||||
f"[!] Cannot read [{config_id}] block because the entry [{e}] could not be found",
|
||||
@@ -278,11 +281,9 @@ class SettingsLoader:
|
||||
error = True
|
||||
self.__print(f"[!] Missing content entry for: {entry}", 'ERROR')
|
||||
if error is True:
|
||||
self.__print("[!] Missing or incorrect settings data. Cannot proceed", 'CRITICAL')
|
||||
raise ValueError(
|
||||
self.__print(
|
||||
"[!] Missing or incorrect settings data. Cannot proceed: " + "; ".join(self.__error_msg),
|
||||
'CRITICAL'
|
||||
)
|
||||
"Missing or incorrect settings data. Cannot proceed: " + "; ".join(self.__error_msg)
|
||||
)
|
||||
# set empty
|
||||
for [entry, empty_set] in entry_set_empty.items():
|
||||
@@ -576,7 +577,7 @@ class SettingsLoader:
|
||||
self.log.logger.log(Log.get_log_level_int(level), msg, stacklevel=2)
|
||||
if self.log is None or self.always_print:
|
||||
if print_error:
|
||||
print(msg)
|
||||
print(f"[SettingsLoader] {msg}")
|
||||
if level == 'ERROR':
|
||||
# remove any prefix [!] for error message list
|
||||
self.__error_msg.append(msg.replace('[!] ', '').strip())
|
||||
|
||||
@@ -4,6 +4,8 @@ Send email wrapper
|
||||
|
||||
import smtplib
|
||||
from email.message import EmailMessage
|
||||
from email.header import Header
|
||||
from email.utils import formataddr, parseaddr
|
||||
from typing import TYPE_CHECKING, Any
|
||||
if TYPE_CHECKING:
|
||||
from corelibs.logging_handling.log import Logger
|
||||
@@ -133,21 +135,30 @@ class SendEmail:
|
||||
_subject = template["subject"]
|
||||
_body = template["body"]
|
||||
for key, value in replace.items():
|
||||
_subject = _subject.replace(f"{{{{{key}}}}}", value)
|
||||
_body = _body.replace(f"{{{{{key}}}}}", value)
|
||||
placeholder = f"{{{{{key}}}}}"
|
||||
_subject = _subject.replace(placeholder, value)
|
||||
_body = _body.replace(placeholder, value)
|
||||
name, addr = parseaddr(from_email)
|
||||
if name:
|
||||
# Encode the name part with MIME encoding
|
||||
encoded_name = str(Header(name, 'utf-8'))
|
||||
from_email_encoded = formataddr((encoded_name, addr))
|
||||
else:
|
||||
from_email_encoded = from_email
|
||||
# create a simple email and add subhect, from email
|
||||
msg_email = EmailMessage()
|
||||
# msg.set_content(_body, charset='utf-8', cte='quoted-printable')
|
||||
msg_email.set_content(_body, charset="utf-8")
|
||||
msg_email["Subject"] = _subject
|
||||
msg_email["From"] = from_email
|
||||
msg_email["From"] = from_email_encoded
|
||||
# push to array for sening
|
||||
msg.append(msg_email)
|
||||
return msg
|
||||
|
||||
def send_email_list(
|
||||
self,
|
||||
email: list[EmailMessage], receivers: list[str],
|
||||
emails: list[EmailMessage],
|
||||
receivers: list[str],
|
||||
combined_send: bool | None = None,
|
||||
test_only: bool | None = None
|
||||
):
|
||||
@@ -170,18 +181,27 @@ class SendEmail:
|
||||
smtp = smtplib.SMTP(smtp_host)
|
||||
except ConnectionRefusedError as e:
|
||||
self.log.error("Could not open SMTP connection to: %s, %s", smtp_host, e)
|
||||
# prepare receiver list
|
||||
receivers_encoded: list[str] = []
|
||||
for __receiver in receivers:
|
||||
to_name, to_addr = parseaddr(__receiver)
|
||||
if to_name:
|
||||
# Encode the name part with MIME encoding
|
||||
encoded_to_name = str(Header(to_name, 'utf-8'))
|
||||
receivers_encoded.append(formataddr((encoded_to_name, to_addr)))
|
||||
else:
|
||||
receivers_encoded.append(__receiver)
|
||||
# loop over messages and then over recievers
|
||||
for msg in email:
|
||||
for msg in emails:
|
||||
if combined_send is True:
|
||||
msg["To"] = ", ".join(receivers)
|
||||
msg["To"] = ", ".join(receivers_encoded)
|
||||
if not self.settings.get('test'):
|
||||
if smtp is not None:
|
||||
smtp.send_message(msg, msg["From"], receivers)
|
||||
smtp.send_message(msg, msg["From"], receivers_encoded)
|
||||
else:
|
||||
self.log.info(f"[EMAIL] Test, not sending email\n{msg}")
|
||||
else:
|
||||
for receiver in receivers:
|
||||
# send to
|
||||
for receiver in receivers_encoded:
|
||||
self.log.debug(f"===> Send to: {receiver}")
|
||||
if "To" in msg:
|
||||
msg.replace_header("To", receiver)
|
||||
|
||||
@@ -4,11 +4,38 @@ Various dictionary, object and list hashers
|
||||
|
||||
import json
|
||||
import hashlib
|
||||
from typing import Any
|
||||
from typing import Any, cast, Sequence
|
||||
|
||||
|
||||
def hash_object(obj: Any) -> str:
|
||||
"""
|
||||
RECOMMENDED for new use
|
||||
Create a hash for any dict or list with mixed key types
|
||||
|
||||
Arguments:
|
||||
obj {Any} -- _description_
|
||||
|
||||
Returns:
|
||||
str -- _description_
|
||||
"""
|
||||
def normalize(o: Any) -> Any:
|
||||
if isinstance(o, dict):
|
||||
# Sort by repr of keys to handle mixed types (str, int, etc.)
|
||||
o = cast(dict[Any, Any], o)
|
||||
return tuple(sorted((repr(k), normalize(v)) for k, v in o.items()))
|
||||
if isinstance(o, (list, tuple)):
|
||||
o = cast(Sequence[Any], o)
|
||||
return tuple(normalize(item) for item in o)
|
||||
return repr(o)
|
||||
|
||||
normalized = normalize(obj)
|
||||
return hashlib.sha256(str(normalized).encode()).hexdigest()
|
||||
|
||||
|
||||
def dict_hash_frozen(data: dict[Any, Any]) -> int:
|
||||
"""
|
||||
NOT RECOMMENDED, use dict_hash_crc or hash_object instead
|
||||
If used, DO NOT CHANGE
|
||||
hash a dict via freeze
|
||||
|
||||
Args:
|
||||
@@ -22,18 +49,25 @@ def dict_hash_frozen(data: dict[Any, Any]) -> int:
|
||||
|
||||
def dict_hash_crc(data: dict[Any, Any] | list[Any]) -> str:
|
||||
"""
|
||||
Create a sha256 hash over dict
|
||||
LEGACY METHOD, must be kept for fallback, if used by other code, DO NOT CHANGE
|
||||
Create a sha256 hash over dict or list
|
||||
alternative for
|
||||
dict_hash_frozen
|
||||
|
||||
Args:
|
||||
data (dict | list): _description_
|
||||
data (dict[Any, Any] | list[Any]): _description_
|
||||
|
||||
Returns:
|
||||
str: _description_
|
||||
str: sha256 hash, prefiex with HO_ if fallback used
|
||||
"""
|
||||
return hashlib.sha256(
|
||||
json.dumps(data, sort_keys=True, ensure_ascii=True).encode('utf-8')
|
||||
).hexdigest()
|
||||
try:
|
||||
return hashlib.sha256(
|
||||
# IT IS IMPORTANT THAT THE BELOW CALL STAYS THE SAME AND DOES NOT CHANGE OR WE WILL GET DIFFERENT HASHES
|
||||
# separators=(',', ':') to get rid of spaces, but if this is used the hash will be different, DO NOT ADD
|
||||
json.dumps(data, sort_keys=True, ensure_ascii=True, default=str).encode('utf-8')
|
||||
).hexdigest()
|
||||
except TypeError:
|
||||
# Fallback tod different hasher, will return DIFFERENT hash than above, so only usable in int/str key mixes
|
||||
return "HO_" + hash_object(data)
|
||||
|
||||
# __END__
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
List type helpers
|
||||
"""
|
||||
|
||||
import json
|
||||
from typing import Any, Sequence
|
||||
|
||||
|
||||
@@ -44,4 +45,31 @@ def is_list_in_list(
|
||||
# Get the difference and extract just the values
|
||||
return [item for item, _ in set_a - set_b]
|
||||
|
||||
|
||||
def make_unique_list_of_dicts(dict_list: list[Any]) -> list[Any]:
|
||||
"""
|
||||
Create a list of unique dictionary entries
|
||||
|
||||
Arguments:
|
||||
dict_list {list[Any]} -- _description_
|
||||
|
||||
Returns:
|
||||
list[Any] -- _description_
|
||||
"""
|
||||
try:
|
||||
# try json dumps, can fail with int and str index types
|
||||
return list(
|
||||
{
|
||||
json.dumps(d, sort_keys=True, ensure_ascii=True, separators=(',', ':')): d
|
||||
for d in dict_list
|
||||
}.values()
|
||||
)
|
||||
except TypeError:
|
||||
# Fallback for non-serializable entries, slow but works
|
||||
unique: list[Any] = []
|
||||
for d in dict_list:
|
||||
if d not in unique:
|
||||
unique.append(d)
|
||||
return unique
|
||||
|
||||
# __END__
|
||||
|
||||
@@ -602,9 +602,9 @@ class Log(LogParent):
|
||||
__setting = self.DEFAULT_LOG_SETTINGS.get(__log_entry, True)
|
||||
default_log_settings[__log_entry] = __setting
|
||||
# check console log type
|
||||
default_log_settings['console_format_type'] = cast('ConsoleFormat', log_settings.get(
|
||||
'console_format_type', self.DEFAULT_LOG_SETTINGS['console_format_type']
|
||||
))
|
||||
if (console_format_type := log_settings.get('console_format_type')) is None:
|
||||
console_format_type = self.DEFAULT_LOG_SETTINGS['console_format_type']
|
||||
default_log_settings['console_format_type'] = cast('ConsoleFormat', console_format_type)
|
||||
# check log queue
|
||||
__setting = log_settings.get('log_queue', self.DEFAULT_LOG_SETTINGS['log_queue'])
|
||||
if __setting is not None:
|
||||
@@ -774,6 +774,16 @@ class Log(LogParent):
|
||||
self.__set_console_formatter(console_format_type)
|
||||
)
|
||||
|
||||
def get_console_formatter(self) -> ConsoleFormat:
|
||||
"""
|
||||
Get the current console formatter, this the settings type
|
||||
Note that if eg "ALL" is set it will return the combined information but not the ALL flag name itself
|
||||
|
||||
Returns:
|
||||
ConsoleFormat -- _description_
|
||||
"""
|
||||
return self.log_settings['console_format_type']
|
||||
|
||||
# MARK: console handler
|
||||
def __create_console_handler(
|
||||
self, handler_name: str,
|
||||
|
||||
@@ -3,32 +3,61 @@ requests lib interface
|
||||
V2 call type
|
||||
"""
|
||||
|
||||
from typing import Any
|
||||
import warnings
|
||||
from typing import Any, TypedDict, cast
|
||||
import requests
|
||||
# to hide the verfiy warnings because of the bad SSL settings from Netskope, Akamai, etc
|
||||
warnings.filterwarnings('ignore', message='Unverified HTTPS request')
|
||||
from requests import exceptions
|
||||
|
||||
|
||||
class ErrorResponse:
|
||||
"""
|
||||
Error response structure. This is returned if a request could not be completed
|
||||
"""
|
||||
def __init__(
|
||||
self,
|
||||
code: int,
|
||||
message: str,
|
||||
action: str,
|
||||
url: str,
|
||||
exception: exceptions.InvalidSchema | exceptions.ReadTimeout | exceptions.ConnectionError | None = None
|
||||
) -> None:
|
||||
self.code = code
|
||||
self.message = message
|
||||
self.action = action
|
||||
self.url = url
|
||||
self.exception_name = type(exception).__name__ if exception is not None else None
|
||||
self.exception_trace = exception if exception is not None else None
|
||||
|
||||
|
||||
class ProxyConfig(TypedDict):
|
||||
"""
|
||||
Socks proxy settings
|
||||
"""
|
||||
type: str
|
||||
host: str
|
||||
port: str
|
||||
|
||||
|
||||
class Caller:
|
||||
"""_summary_"""
|
||||
"""
|
||||
requests lib interface
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
header: dict[str, str],
|
||||
verify: bool = True,
|
||||
timeout: int = 20,
|
||||
proxy: dict[str, str] | None = None,
|
||||
proxy: ProxyConfig | None = None,
|
||||
verify: bool = True,
|
||||
ca_file: str | None = None
|
||||
):
|
||||
self.headers = header
|
||||
self.timeout: int = timeout
|
||||
self.cafile = ca_file
|
||||
self.ca_file = ca_file
|
||||
self.verify = verify
|
||||
self.proxy = proxy
|
||||
self.proxy = cast(dict[str, str], proxy) if proxy is not None else None
|
||||
|
||||
def __timeout(self, timeout: int | None) -> int:
|
||||
if timeout is not None:
|
||||
if timeout is not None and timeout >= 0:
|
||||
return timeout
|
||||
return self.timeout
|
||||
|
||||
@@ -39,7 +68,7 @@ class Caller:
|
||||
data: dict[str, Any] | None = None,
|
||||
params: dict[str, Any] | None = None,
|
||||
timeout: int | None = None
|
||||
) -> requests.Response | None:
|
||||
) -> requests.Response | ErrorResponse:
|
||||
"""
|
||||
call wrapper, on error returns None
|
||||
|
||||
@@ -56,67 +85,96 @@ class Caller:
|
||||
if data is None:
|
||||
data = {}
|
||||
try:
|
||||
response = None
|
||||
if action == "get":
|
||||
response = requests.get(
|
||||
return requests.get(
|
||||
url,
|
||||
params=params,
|
||||
headers=self.headers,
|
||||
timeout=self.__timeout(timeout),
|
||||
verify=self.verify,
|
||||
proxies=self.proxy
|
||||
proxies=self.proxy,
|
||||
cert=self.ca_file
|
||||
)
|
||||
elif action == "post":
|
||||
response = requests.post(
|
||||
if action == "post":
|
||||
return requests.post(
|
||||
url,
|
||||
params=params,
|
||||
json=data,
|
||||
headers=self.headers,
|
||||
timeout=self.__timeout(timeout),
|
||||
verify=self.verify,
|
||||
proxies=self.proxy
|
||||
proxies=self.proxy,
|
||||
cert=self.ca_file
|
||||
)
|
||||
elif action == "put":
|
||||
response = requests.put(
|
||||
if action == "put":
|
||||
return requests.put(
|
||||
url,
|
||||
params=params,
|
||||
json=data,
|
||||
headers=self.headers,
|
||||
timeout=self.__timeout(timeout),
|
||||
verify=self.verify,
|
||||
proxies=self.proxy
|
||||
proxies=self.proxy,
|
||||
cert=self.ca_file
|
||||
)
|
||||
elif action == "patch":
|
||||
response = requests.patch(
|
||||
if action == "patch":
|
||||
return requests.patch(
|
||||
url,
|
||||
params=params,
|
||||
json=data,
|
||||
headers=self.headers,
|
||||
timeout=self.__timeout(timeout),
|
||||
verify=self.verify,
|
||||
proxies=self.proxy
|
||||
proxies=self.proxy,
|
||||
cert=self.ca_file
|
||||
)
|
||||
elif action == "delete":
|
||||
response = requests.delete(
|
||||
if action == "delete":
|
||||
return requests.delete(
|
||||
url,
|
||||
params=params,
|
||||
headers=self.headers,
|
||||
timeout=self.__timeout(timeout),
|
||||
verify=self.verify,
|
||||
proxies=self.proxy
|
||||
proxies=self.proxy,
|
||||
cert=self.ca_file
|
||||
)
|
||||
return response
|
||||
except requests.exceptions.InvalidSchema as e:
|
||||
print(f"Invalid URL during '{action}' for {url}:\n\t{e}")
|
||||
return None
|
||||
except requests.exceptions.ReadTimeout as e:
|
||||
print(f"Timeout ({self.timeout}s) during '{action}' for {url}:\n\t{e}")
|
||||
return None
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
print(f"Connection error during '{action}' for {url}:\n\t{e}")
|
||||
return None
|
||||
return ErrorResponse(
|
||||
100,
|
||||
f"Unsupported action '{action}'",
|
||||
action,
|
||||
url
|
||||
)
|
||||
except exceptions.InvalidSchema as e:
|
||||
return ErrorResponse(
|
||||
200,
|
||||
f"Invalid URL during '{action}' for {url}",
|
||||
action,
|
||||
url,
|
||||
e
|
||||
)
|
||||
except exceptions.ReadTimeout as e:
|
||||
return ErrorResponse(
|
||||
300,
|
||||
f"Timeout ({self.timeout}s) during '{action}' for {url}",
|
||||
action,
|
||||
url,
|
||||
e
|
||||
)
|
||||
except exceptions.ConnectionError as e:
|
||||
return ErrorResponse(
|
||||
400,
|
||||
f"Connection error during '{action}' for {url}",
|
||||
action,
|
||||
url,
|
||||
e
|
||||
)
|
||||
|
||||
def get(self, url: str, params: dict[str, Any] | None = None) -> requests.Response | None:
|
||||
def get(
|
||||
self,
|
||||
url: str,
|
||||
params: dict[str, Any] | None = None,
|
||||
timeout: int | None = None
|
||||
) -> requests.Response | ErrorResponse:
|
||||
"""
|
||||
get data
|
||||
|
||||
@@ -127,11 +185,15 @@ class Caller:
|
||||
Returns:
|
||||
requests.Response: _description_
|
||||
"""
|
||||
return self.__call('get', url, params=params)
|
||||
return self.__call('get', url, params=params, timeout=timeout)
|
||||
|
||||
def post(
|
||||
self, url: str, data: dict[str, Any] | None = None, params: dict[str, Any] | None = None
|
||||
) -> requests.Response | None:
|
||||
self,
|
||||
url: str,
|
||||
data: dict[str, Any] | None = None,
|
||||
params: dict[str, Any] | None = None,
|
||||
timeout: int | None = None
|
||||
) -> requests.Response | ErrorResponse:
|
||||
"""
|
||||
post data
|
||||
|
||||
@@ -143,11 +205,15 @@ class Caller:
|
||||
Returns:
|
||||
requests.Response | None: _description_
|
||||
"""
|
||||
return self.__call('post', url, data, params)
|
||||
return self.__call('post', url, data, params, timeout=timeout)
|
||||
|
||||
def put(
|
||||
self, url: str, data: dict[str, Any] | None = None, params: dict[str, Any] | None = None
|
||||
) -> requests.Response | None:
|
||||
self,
|
||||
url: str,
|
||||
data: dict[str, Any] | None = None,
|
||||
params: dict[str, Any] | None = None,
|
||||
timeout: int | None = None
|
||||
) -> requests.Response | ErrorResponse:
|
||||
"""_summary_
|
||||
|
||||
Args:
|
||||
@@ -158,11 +224,15 @@ class Caller:
|
||||
Returns:
|
||||
requests.Response | None: _description_
|
||||
"""
|
||||
return self.__call('put', url, data, params)
|
||||
return self.__call('put', url, data, params, timeout=timeout)
|
||||
|
||||
def patch(
|
||||
self, url: str, data: dict[str, Any] | None = None, params: dict[str, Any] | None = None
|
||||
) -> requests.Response | None:
|
||||
self,
|
||||
url: str,
|
||||
data: dict[str, Any] | None = None,
|
||||
params: dict[str, Any] | None = None,
|
||||
timeout: int | None = None
|
||||
) -> requests.Response | ErrorResponse:
|
||||
"""_summary_
|
||||
|
||||
Args:
|
||||
@@ -173,9 +243,14 @@ class Caller:
|
||||
Returns:
|
||||
requests.Response | None: _description_
|
||||
"""
|
||||
return self.__call('patch', url, data, params)
|
||||
return self.__call('patch', url, data, params, timeout=timeout)
|
||||
|
||||
def delete(self, url: str, params: dict[str, Any] | None = None) -> requests.Response | None:
|
||||
def delete(
|
||||
self,
|
||||
url: str,
|
||||
params: dict[str, Any] | None = None,
|
||||
timeout: int | None = None
|
||||
) -> requests.Response | ErrorResponse:
|
||||
"""
|
||||
delete
|
||||
|
||||
@@ -186,6 +261,6 @@ class Caller:
|
||||
Returns:
|
||||
requests.Response | None: _description_
|
||||
"""
|
||||
return self.__call('delete', url, params=params)
|
||||
return self.__call('delete', url, params=params, timeout=timeout)
|
||||
|
||||
# __END__
|
||||
|
||||
@@ -2,14 +2,28 @@
|
||||
Test check andling for regex checks
|
||||
"""
|
||||
|
||||
import re
|
||||
from corelibs.check_handling.regex_constants import DOMAIN_WITH_LOCALHOST_REGEX
|
||||
from corelibs_text_colors.text_colors import Colors
|
||||
from corelibs.check_handling.regex_constants import (
|
||||
compile_re, DOMAIN_WITH_LOCALHOST_REGEX, EMAIL_BASIC_REGEX, NAME_EMAIL_BASIC_REGEX, SUB_EMAIL_BASIC_REGEX
|
||||
)
|
||||
from corelibs.check_handling.regex_constants_compiled import (
|
||||
COMPILED_DOMAIN_WITH_LOCALHOST_REGEX, COMPILED_EMAIL_BASIC_REGEX,
|
||||
COMPILED_NAME_EMAIL_SIMPLE_REGEX, COMPILED_NAME_EMAIL_BASIC_REGEX
|
||||
)
|
||||
|
||||
NAME_EMAIL_SIMPLE_REGEX = r"""
|
||||
^\s*(?:"(?P<name1>[^"]+)"\s*<(?P<email1>[^>]+)>|
|
||||
(?P<name2>.+?)\s*<(?P<email2>[^>]+)>|
|
||||
<(?P<email3>[^>]+)>|
|
||||
(?P<email4>[^\s<>]+))\s*$
|
||||
"""
|
||||
|
||||
|
||||
def main():
|
||||
def domain_test():
|
||||
"""
|
||||
Test regex checks
|
||||
domain regex test
|
||||
"""
|
||||
print("=" * 30)
|
||||
test_domains = [
|
||||
"example.com",
|
||||
"localhost",
|
||||
@@ -18,7 +32,7 @@ def main():
|
||||
"some-domain.org"
|
||||
]
|
||||
|
||||
regex_domain_check = re.compile(DOMAIN_WITH_LOCALHOST_REGEX)
|
||||
regex_domain_check = COMPILED_DOMAIN_WITH_LOCALHOST_REGEX
|
||||
print(f"REGEX: {DOMAIN_WITH_LOCALHOST_REGEX}")
|
||||
print(f"Check regex: {regex_domain_check.search('localhost')}")
|
||||
|
||||
@@ -29,6 +43,66 @@ def main():
|
||||
print(f"Did not match: {domain}")
|
||||
|
||||
|
||||
def email_test():
|
||||
"""
|
||||
email regex test
|
||||
"""
|
||||
print("=" * 30)
|
||||
email_list = """
|
||||
e@bar.com
|
||||
<f@foobar.com>
|
||||
"Master" <foobar@bar.com>
|
||||
"not valid" not@valid.com
|
||||
also not valid not@valid.com
|
||||
some header <something@bar.com>
|
||||
test master <master@master.com>
|
||||
日本語 <japan@jp.net>
|
||||
"ひほん カケ苦" <foo@bar.com>
|
||||
single@entry.com
|
||||
arsch@popsch.com
|
||||
test open <open@open.com>
|
||||
"""
|
||||
|
||||
print(f"REGEX: SUB_EMAIL_BASIC_REGEX: {SUB_EMAIL_BASIC_REGEX}")
|
||||
print(f"REGEX: EMAIL_BASIC_REGEX: {EMAIL_BASIC_REGEX}")
|
||||
print(f"REGEX: COMPILED_NAME_EMAIL_SIMPLE_REGEX: {COMPILED_NAME_EMAIL_SIMPLE_REGEX}")
|
||||
print(f"REGEX: NAME_EMAIL_BASIC_REGEX: {NAME_EMAIL_BASIC_REGEX}")
|
||||
|
||||
basic_email = COMPILED_EMAIL_BASIC_REGEX
|
||||
sub_basic_email = compile_re(SUB_EMAIL_BASIC_REGEX)
|
||||
simple_name_email_regex = COMPILED_NAME_EMAIL_SIMPLE_REGEX
|
||||
full_name_email_regex = COMPILED_NAME_EMAIL_BASIC_REGEX
|
||||
for email in email_list.splitlines():
|
||||
email = email.strip()
|
||||
if not email:
|
||||
continue
|
||||
print(f">>> Testing: {email}")
|
||||
if not basic_email.match(email):
|
||||
print(f"{Colors.red}[EMAIL ] No match: {email}{Colors.reset}")
|
||||
else:
|
||||
print(f"{Colors.green}[EMAIL ] Matched : {email}{Colors.reset}")
|
||||
if not sub_basic_email.match(email):
|
||||
print(f"{Colors.red}[SUB ] No match: {email}{Colors.reset}")
|
||||
else:
|
||||
print(f"{Colors.green}[SUB ] Matched : {email}{Colors.reset}")
|
||||
if not simple_name_email_regex.match(email):
|
||||
print(f"{Colors.red}[SIMPLE] No match: {email}{Colors.reset}")
|
||||
else:
|
||||
print(f"{Colors.green}[SIMPLE] Matched : {email}{Colors.reset}")
|
||||
if not full_name_email_regex.match(email):
|
||||
print(f"{Colors.red}[FULL ] No match: {email}{Colors.reset}")
|
||||
else:
|
||||
print(f"{Colors.green}[FULL ] Matched : {email}{Colors.reset}")
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
Test regex checks
|
||||
"""
|
||||
domain_test()
|
||||
email_test()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
|
||||
@@ -12,10 +12,12 @@ some_match_list=foo,bar
|
||||
test_list=a,b,c,d f, g h
|
||||
other_list=a|b|c|d|
|
||||
third_list=xy|ab|df|fg
|
||||
empty_list=
|
||||
str_length=foobar
|
||||
int_range=20
|
||||
int_range_not_set=
|
||||
int_range_not_set_empty_set=5
|
||||
bool_var=True
|
||||
#
|
||||
match_target=foo
|
||||
match_target_list=foo,bar,baz
|
||||
|
||||
@@ -21,11 +21,6 @@ def main():
|
||||
Main run
|
||||
"""
|
||||
|
||||
value = "2025/1/1"
|
||||
regex_c = re.compile(SettingsLoaderCheck.CHECK_SETTINGS['string.date']['regex'], re.VERBOSE)
|
||||
result = regex_c.search(value)
|
||||
print(f"regex {regex_c} check against {value} -> {result}")
|
||||
|
||||
# for log testing
|
||||
log = Log(
|
||||
log_path=ROOT_PATH.joinpath(LOG_DIR, 'settings_loader.log'),
|
||||
@@ -37,6 +32,11 @@ def main():
|
||||
)
|
||||
log.logger.info('Settings loader')
|
||||
|
||||
value = "2025/1/1"
|
||||
regex_c = re.compile(SettingsLoaderCheck.CHECK_SETTINGS['string.date']['regex'], re.VERBOSE)
|
||||
result = regex_c.search(value)
|
||||
log.info(f"regex {regex_c} check against {value} -> {result}")
|
||||
|
||||
sl = SettingsLoader(
|
||||
{
|
||||
'overload_from_args': 'OVERLOAD from ARGS',
|
||||
@@ -69,6 +69,9 @@ def main():
|
||||
"split:|",
|
||||
"check:string.alphanumeric"
|
||||
],
|
||||
"empty_list": [
|
||||
"split:,",
|
||||
],
|
||||
"str_length": [
|
||||
"length:2-10"
|
||||
],
|
||||
@@ -81,6 +84,7 @@ def main():
|
||||
"int_range_not_set_empty_set": [
|
||||
"empty:"
|
||||
],
|
||||
"bool_var": ["convert:bool"],
|
||||
"match_target": ["matching:foo"],
|
||||
"match_target_list": ["split:,", "matching:foo|bar|baz",],
|
||||
"match_source_a": ["in:match_target"],
|
||||
|
||||
@@ -24,12 +24,19 @@ def main() -> None:
|
||||
"lookup_value_c": "B02",
|
||||
"replace_value": "R02",
|
||||
},
|
||||
{
|
||||
"lookup_value_p": "A03",
|
||||
"lookup_value_c": "B03",
|
||||
"replace_value": "R03",
|
||||
},
|
||||
]
|
||||
test_foo = ArraySearchList(
|
||||
key = "lookup_value_p",
|
||||
value = "A01"
|
||||
key="lookup_value_p",
|
||||
value="A01"
|
||||
)
|
||||
print(test_foo)
|
||||
result = find_in_array_from_list(data, [test_foo])
|
||||
print(f"Search A: {dump_data(test_foo)} -> {dump_data(result)}")
|
||||
|
||||
search: list[ArraySearchList] = [
|
||||
{
|
||||
"key": "lookup_value_p",
|
||||
@@ -38,12 +45,122 @@ def main() -> None:
|
||||
{
|
||||
"key": "lookup_value_c",
|
||||
"value": "B01"
|
||||
},
|
||||
]
|
||||
result = find_in_array_from_list(data, search)
|
||||
print(f"Search B: {dump_data(search)} -> {dump_data(result)}")
|
||||
|
||||
search: list[ArraySearchList] = [
|
||||
{
|
||||
"key": "lookup_value_p",
|
||||
"value": "A01"
|
||||
},
|
||||
{
|
||||
"key": "lookup_value_c",
|
||||
"value": "B01"
|
||||
},
|
||||
{
|
||||
"key": "lookup_value_c",
|
||||
"value": "B02"
|
||||
},
|
||||
]
|
||||
try:
|
||||
result = find_in_array_from_list(data, search)
|
||||
print(f"Search C: {dump_data(search)} -> {dump_data(result)}")
|
||||
except KeyError as e:
|
||||
print(f"Search C raised KeyError: {e}")
|
||||
|
||||
search: list[ArraySearchList] = [
|
||||
{
|
||||
"key": "lookup_value_p",
|
||||
"value": "A01"
|
||||
},
|
||||
{
|
||||
"key": "lookup_value_c",
|
||||
"value": ["B01", "B02"]
|
||||
},
|
||||
]
|
||||
try:
|
||||
result = find_in_array_from_list(data, search)
|
||||
print(f"Search D: {dump_data(search)} -> {dump_data(result)}")
|
||||
except KeyError as e:
|
||||
print(f"Search D raised KeyError: {e}")
|
||||
|
||||
search: list[ArraySearchList] = [
|
||||
{
|
||||
"key": "lookup_value_p",
|
||||
"value": ["A01", "A03"]
|
||||
},
|
||||
{
|
||||
"key": "lookup_value_c",
|
||||
"value": ["B01", "B02"]
|
||||
},
|
||||
]
|
||||
try:
|
||||
result = find_in_array_from_list(data, search)
|
||||
print(f"Search E: {dump_data(search)} -> {dump_data(result)}")
|
||||
except KeyError as e:
|
||||
print(f"Search E raised KeyError: {e}")
|
||||
|
||||
search: list[ArraySearchList] = [
|
||||
{
|
||||
"key": "lookup_value_p",
|
||||
"value": "NOT FOUND"
|
||||
},
|
||||
]
|
||||
try:
|
||||
result = find_in_array_from_list(data, search)
|
||||
print(f"Search F: {dump_data(search)} -> {dump_data(result)}")
|
||||
except KeyError as e:
|
||||
print(f"Search F raised KeyError: {e}")
|
||||
|
||||
data = [
|
||||
{
|
||||
"sd_user_id": "1593",
|
||||
"email": "",
|
||||
"employee_id": ""
|
||||
},
|
||||
{
|
||||
"sd_user_id": "1592",
|
||||
"email": "",
|
||||
"employee_id": ""
|
||||
},
|
||||
{
|
||||
"sd_user_id": "1596",
|
||||
"email": "",
|
||||
"employee_id": ""
|
||||
},
|
||||
{
|
||||
"sd_user_id": "1594",
|
||||
"email": "",
|
||||
"employee_id": ""
|
||||
},
|
||||
{
|
||||
"sd_user_id": "1595",
|
||||
"email": "",
|
||||
"employee_id": ""
|
||||
},
|
||||
{
|
||||
"sd_user_id": "1861",
|
||||
"email": "",
|
||||
"employee_id": ""
|
||||
},
|
||||
{
|
||||
"sd_user_id": "1862",
|
||||
"email": "",
|
||||
"employee_id": ""
|
||||
},
|
||||
{
|
||||
"sd_user_id": "1860",
|
||||
"email": "",
|
||||
"employee_id": ""
|
||||
}
|
||||
]
|
||||
|
||||
result = find_in_array_from_list(data, search)
|
||||
|
||||
print(f"Search {dump_data(search)} -> {dump_data(result)}")
|
||||
result = find_in_array_from_list(data, [ArraySearchList(
|
||||
key="sd_user_id",
|
||||
value="1593"
|
||||
)])
|
||||
print(f"Search F: -> {dump_data(result)}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -2,7 +2,10 @@
|
||||
test list helpers
|
||||
"""
|
||||
|
||||
from corelibs.iterator_handling.list_helpers import is_list_in_list, convert_to_list
|
||||
from typing import Any
|
||||
from corelibs.debug_handling.dump_data import dump_data
|
||||
from corelibs.iterator_handling.list_helpers import is_list_in_list, convert_to_list, make_unique_list_of_dicts
|
||||
from corelibs.iterator_handling.fingerprint import dict_hash_crc
|
||||
|
||||
|
||||
def __test_is_list_in_list_a():
|
||||
@@ -18,9 +21,66 @@ def __convert_list():
|
||||
print(f"IN: {source} -> {result}")
|
||||
|
||||
|
||||
def __make_unique_list_of_dicts():
|
||||
dict_list = [
|
||||
{"a": 1, "b": 2, "nested": {"x": 10, "y": 20}},
|
||||
{"a": 1, "b": 2, "nested": {"x": 10, "y": 20}},
|
||||
{"b": 2, "a": 1, "nested": {"y": 20, "x": 10}},
|
||||
{"b": 2, "a": 1, "nested": {"y": 20, "x": 30}},
|
||||
{"a": 3, "b": 4, "nested": {"x": 30, "y": 40}}
|
||||
]
|
||||
unique_dicts = make_unique_list_of_dicts(dict_list)
|
||||
dhf = dict_hash_crc(unique_dicts)
|
||||
print(f"Unique dicts: {dump_data(unique_dicts)} [{dhf}]")
|
||||
|
||||
dict_list = [
|
||||
{"a": 1, 1: "one"},
|
||||
{1: "one", "a": 1},
|
||||
{"a": 2, 1: "one"}
|
||||
]
|
||||
unique_dicts = make_unique_list_of_dicts(dict_list)
|
||||
dhf = dict_hash_crc(unique_dicts)
|
||||
print(f"Unique dicts: {dump_data(unique_dicts)} [{dhf}]")
|
||||
|
||||
dict_list = [
|
||||
{"a": 1, "b": [1, 2, 3]},
|
||||
{"b": [1, 2, 3], "a": 1},
|
||||
{"a": 1, "b": [1, 2, 4]},
|
||||
1, 2, "String", 1, "Foobar"
|
||||
]
|
||||
unique_dicts = make_unique_list_of_dicts(dict_list)
|
||||
dhf = dict_hash_crc(unique_dicts)
|
||||
print(f"Unique dicts: {dump_data(unique_dicts)} [{dhf}]")
|
||||
|
||||
dict_list: list[Any] = [
|
||||
[],
|
||||
{},
|
||||
[],
|
||||
{},
|
||||
{"a": []},
|
||||
{"a": []},
|
||||
{"a": {}},
|
||||
{"a": {}},
|
||||
]
|
||||
unique_dicts = make_unique_list_of_dicts(dict_list)
|
||||
dhf = dict_hash_crc(unique_dicts)
|
||||
print(f"Unique dicts: {dump_data(unique_dicts)} [{dhf}]")
|
||||
|
||||
dict_list: list[Any] = [
|
||||
(1, 2),
|
||||
(1, 2),
|
||||
(2, 3),
|
||||
]
|
||||
unique_dicts = make_unique_list_of_dicts(dict_list)
|
||||
dhf = dict_hash_crc(unique_dicts)
|
||||
print(f"Unique dicts: {dump_data(unique_dicts)} [{dhf}]")
|
||||
|
||||
|
||||
def main():
|
||||
"""List helpers test runner"""
|
||||
__test_is_list_in_list_a()
|
||||
__convert_list()
|
||||
__make_unique_list_of_dicts()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -27,7 +27,8 @@ def main():
|
||||
"per_run_log": True,
|
||||
# "console_format_type": ConsoleFormatSettings.NONE,
|
||||
# "console_format_type": ConsoleFormatSettings.MINIMAL,
|
||||
"console_format_type": ConsoleFormat.TIME_MICROSECONDS | ConsoleFormat.NAME | ConsoleFormat.LEVEL,
|
||||
# "console_format_type": ConsoleFormat.TIME_MICROSECONDS | ConsoleFormat.NAME | ConsoleFormat.LEVEL,
|
||||
"console_format_type": None,
|
||||
# "console_format_type": ConsoleFormat.NAME,
|
||||
# "console_format_type": (
|
||||
# ConsoleFormat.TIME | ConsoleFormat.TIMEZONE | ConsoleFormat.LINENO | ConsoleFormat.LEVEL
|
||||
@@ -121,10 +122,16 @@ def main():
|
||||
|
||||
log.set_log_level(Log.CONSOLE_HANDLER, LoggingLevel.DEBUG)
|
||||
log.debug('Current logging format: %s', log.log_settings['console_format_type'])
|
||||
log.debug('Current console formatter: %s', log.get_console_formatter())
|
||||
log.update_console_formatter(ConsoleFormat.TIME | ConsoleFormat.LINENO)
|
||||
log.info('Does hit show less')
|
||||
log.info('Does hit show less A')
|
||||
log.debug('Current console formatter after A: %s', log.get_console_formatter())
|
||||
log.update_console_formatter(ConsoleFormat.TIME | ConsoleFormat.LINENO)
|
||||
log.info('Does hit show less B')
|
||||
log.debug('Current console formatter after B: %s', log.get_console_formatter())
|
||||
log.update_console_formatter(ConsoleFormatSettings.ALL)
|
||||
log.info('Does hit show less C')
|
||||
log.debug('Current console formatter after C: %s', log.get_console_formatter())
|
||||
print(f"*** Any handler is minimum level ERROR: {log.any_handler_is_minimum_level(LoggingLevel.ERROR)}")
|
||||
print(f"*** Any handler is minimum level DEBUG: {log.any_handler_is_minimum_level(LoggingLevel.DEBUG)}")
|
||||
|
||||
|
||||
@@ -8,10 +8,21 @@ import re
|
||||
import pytest
|
||||
from corelibs.check_handling.regex_constants import (
|
||||
compile_re,
|
||||
SUB_EMAIL_BASIC_REGEX,
|
||||
EMAIL_BASIC_REGEX,
|
||||
NAME_EMAIL_SIMPLE_REGEX,
|
||||
NAME_EMAIL_BASIC_REGEX,
|
||||
DOMAIN_WITH_LOCALHOST_REGEX,
|
||||
DOMAIN_WITH_LOCALHOST_PORT_REGEX,
|
||||
DOMAIN_REGEX,
|
||||
DOMAIN_REGEX
|
||||
)
|
||||
from corelibs.check_handling.regex_constants_compiled import (
|
||||
COMPILED_EMAIL_BASIC_REGEX,
|
||||
COMPILED_NAME_EMAIL_SIMPLE_REGEX,
|
||||
COMPILED_NAME_EMAIL_BASIC_REGEX,
|
||||
COMPILED_DOMAIN_WITH_LOCALHOST_REGEX,
|
||||
COMPILED_DOMAIN_WITH_LOCALHOST_PORT_REGEX,
|
||||
COMPILED_DOMAIN_REGEX,
|
||||
)
|
||||
|
||||
|
||||
@@ -48,7 +59,7 @@ class TestEmailBasicRegex:
|
||||
@pytest.fixture
|
||||
def email_pattern(self) -> re.Pattern[str]:
|
||||
"""Fixture that returns compiled email regex pattern."""
|
||||
return compile_re(EMAIL_BASIC_REGEX)
|
||||
return COMPILED_EMAIL_BASIC_REGEX
|
||||
|
||||
@pytest.mark.parametrize("valid_email", [
|
||||
"user@example.com",
|
||||
@@ -123,13 +134,272 @@ class TestEmailBasicRegex:
|
||||
assert not email_pattern.match(email)
|
||||
|
||||
|
||||
class TestSubEmailBasicRegex:
|
||||
"""Test cases for SUB_EMAIL_BASIC_REGEX pattern (without anchors)."""
|
||||
|
||||
@pytest.fixture
|
||||
def sub_email_pattern(self) -> re.Pattern[str]:
|
||||
"""Fixture that returns compiled sub email regex pattern."""
|
||||
return compile_re(rf"^{SUB_EMAIL_BASIC_REGEX}$")
|
||||
|
||||
@pytest.mark.parametrize("valid_email", [
|
||||
"user@example.com",
|
||||
"test.user@example.com",
|
||||
"user+tag@example.co.uk",
|
||||
"first.last@subdomain.example.com",
|
||||
"user123@test-domain.com",
|
||||
"a@example.com",
|
||||
"user_name@example.com",
|
||||
"user-name@example.com",
|
||||
"user@sub.domain.example.com",
|
||||
"test!#$%&'*+-/=?^_`{|}~@example.com",
|
||||
"1234567890@example.com",
|
||||
])
|
||||
def test_valid_emails_match(self, sub_email_pattern: re.Pattern[str], valid_email: str) -> None:
|
||||
"""Test that valid email addresses match SUB_EMAIL_BASIC_REGEX."""
|
||||
assert sub_email_pattern.match(valid_email), (
|
||||
f"Failed to match valid email: {valid_email}"
|
||||
)
|
||||
|
||||
@pytest.mark.parametrize("invalid_email", [
|
||||
"",
|
||||
"@example.com",
|
||||
"user@",
|
||||
"user",
|
||||
"user@.com",
|
||||
"user@domain",
|
||||
"user @example.com",
|
||||
".user@example.com",
|
||||
"user@-example.com",
|
||||
"user@example-.com",
|
||||
"user@example.c",
|
||||
"user@example.toolong",
|
||||
])
|
||||
def test_invalid_emails_no_match(self, sub_email_pattern: re.Pattern[str], invalid_email: str) -> None:
|
||||
"""Test that invalid emails don't match SUB_EMAIL_BASIC_REGEX."""
|
||||
assert not sub_email_pattern.match(invalid_email), (
|
||||
f"Incorrectly matched invalid email: {invalid_email}"
|
||||
)
|
||||
|
||||
def test_sub_email_max_local_part_length(self, sub_email_pattern: re.Pattern[str]) -> None:
|
||||
"""Test email with maximum local part length (64 characters)."""
|
||||
local_part = "a" * 64
|
||||
email = f"{local_part}@example.com"
|
||||
assert sub_email_pattern.match(email)
|
||||
|
||||
def test_sub_email_exceeds_local_part_length(self, sub_email_pattern: re.Pattern[str]) -> None:
|
||||
"""Test email exceeding maximum local part length."""
|
||||
local_part = "a" * 65
|
||||
email = f"{local_part}@example.com"
|
||||
assert not sub_email_pattern.match(email)
|
||||
|
||||
|
||||
class TestNameEmailSimpleRegex:
|
||||
"""Test cases for NAME_EMAIL_SIMPLE_REGEX pattern."""
|
||||
|
||||
@pytest.fixture
|
||||
def name_email_simple_pattern(self) -> re.Pattern[str]:
|
||||
"""Fixture that returns compiled name+email simple regex pattern."""
|
||||
return COMPILED_NAME_EMAIL_SIMPLE_REGEX
|
||||
|
||||
@pytest.mark.parametrize("test_input,expected_groups", [
|
||||
('"John Doe" <john@example.com>', {'name1': 'John Doe', 'email1': 'john@example.com'}),
|
||||
('John Doe <john@example.com>', {'name2': 'John Doe', 'email2': 'john@example.com'}),
|
||||
('<john@example.com>', {'email3': 'john@example.com'}),
|
||||
('john@example.com', {'email4': 'john@example.com'}),
|
||||
(' "Jane Smith" <jane@test.com> ', {'name1': 'Jane Smith', 'email1': 'jane@test.com'}),
|
||||
('Bob <bob@test.org>', {'name2': 'Bob', 'email2': 'bob@test.org'}),
|
||||
])
|
||||
def test_valid_name_email_combinations(
|
||||
self, name_email_simple_pattern: re.Pattern[str], test_input: str, expected_groups: dict[str, str]
|
||||
) -> None:
|
||||
"""Test that valid name+email combinations match and extract correct groups."""
|
||||
match = name_email_simple_pattern.match(test_input)
|
||||
assert match is not None, f"Failed to match: {test_input}"
|
||||
|
||||
# Check that expected groups are present and match
|
||||
for group_name, expected_value in expected_groups.items():
|
||||
assert match.group(group_name) == expected_value, (
|
||||
f"Group {group_name} expected '{expected_value}', got '{match.group(group_name)}'"
|
||||
)
|
||||
|
||||
@pytest.mark.parametrize("invalid_input", [
|
||||
"",
|
||||
"not an email",
|
||||
"<>",
|
||||
'"Name Only"',
|
||||
'Name <',
|
||||
'<email',
|
||||
'Name <<email@test.com>>',
|
||||
'Name <email@test.com',
|
||||
'Name email@test.com>',
|
||||
])
|
||||
def test_invalid_name_email_combinations(
|
||||
self, name_email_simple_pattern: re.Pattern[str], invalid_input: str
|
||||
) -> None:
|
||||
"""Test that invalid inputs don't match NAME_EMAIL_SIMPLE_REGEX."""
|
||||
assert not name_email_simple_pattern.match(invalid_input), (
|
||||
f"Incorrectly matched invalid input: {invalid_input}"
|
||||
)
|
||||
|
||||
def test_extract_name_from_quoted(
|
||||
self, name_email_simple_pattern: re.Pattern[str]
|
||||
) -> None:
|
||||
"""Test extracting name from quoted format."""
|
||||
match = name_email_simple_pattern.match('"Alice Wonder" <alice@example.com>')
|
||||
assert match is not None
|
||||
assert match.group('name1') == 'Alice Wonder'
|
||||
assert match.group('email1') == 'alice@example.com'
|
||||
|
||||
def test_extract_name_from_unquoted(
|
||||
self, name_email_simple_pattern: re.Pattern[str]
|
||||
) -> None:
|
||||
"""Test extracting name from unquoted format."""
|
||||
match = name_email_simple_pattern.match('Bob Builder <bob@example.com>')
|
||||
assert match is not None
|
||||
assert match.group('name2') == 'Bob Builder'
|
||||
assert match.group('email2') == 'bob@example.com'
|
||||
|
||||
def test_email_only_in_brackets(
|
||||
self, name_email_simple_pattern: re.Pattern[str]
|
||||
) -> None:
|
||||
"""Test email-only format in angle brackets."""
|
||||
match = name_email_simple_pattern.match('<charlie@example.com>')
|
||||
assert match is not None
|
||||
assert match.group('email3') == 'charlie@example.com'
|
||||
|
||||
def test_email_only_plain(
|
||||
self, name_email_simple_pattern: re.Pattern[str]
|
||||
) -> None:
|
||||
"""Test plain email format without brackets."""
|
||||
match = name_email_simple_pattern.match('dave@example.com')
|
||||
assert match is not None
|
||||
assert match.group('email4') == 'dave@example.com'
|
||||
|
||||
def test_whitespace_handling(
|
||||
self, name_email_simple_pattern: re.Pattern[str]
|
||||
) -> None:
|
||||
"""Test that leading/trailing whitespace is handled correctly."""
|
||||
match = name_email_simple_pattern.match(' "User Name" <user@example.com> ')
|
||||
assert match is not None
|
||||
assert match.group('name1') == 'User Name'
|
||||
assert match.group('email1') == 'user@example.com'
|
||||
|
||||
|
||||
class TestNameEmailBasicRegex:
|
||||
"""Test cases for NAME_EMAIL_BASIC_REGEX pattern with strict email validation."""
|
||||
|
||||
@pytest.fixture
|
||||
def name_email_basic_pattern(self) -> re.Pattern[str]:
|
||||
"""Fixture that returns compiled name+email basic regex pattern."""
|
||||
return COMPILED_NAME_EMAIL_BASIC_REGEX
|
||||
|
||||
@pytest.mark.parametrize("test_input,expected_name,expected_email", [
|
||||
('"John Doe" <john@example.com>', 'John Doe', 'john@example.com'),
|
||||
('John Doe <john@example.com>', 'John Doe', 'john@example.com'),
|
||||
('<john@example.com>', None, 'john@example.com'),
|
||||
('john@example.com', None, 'john@example.com'),
|
||||
(' "Jane Smith" <jane.smith@test.co.uk> ', 'Jane Smith', 'jane.smith@test.co.uk'),
|
||||
('Alice Wonder <alice+tag@example.com>', 'Alice Wonder', 'alice+tag@example.com'),
|
||||
])
|
||||
def test_valid_name_email_with_validation(
|
||||
self,
|
||||
name_email_basic_pattern: re.Pattern[str],
|
||||
test_input: str,
|
||||
expected_name: str | None,
|
||||
expected_email: str,
|
||||
) -> None:
|
||||
"""Test valid name+email with strict email validation."""
|
||||
match = name_email_basic_pattern.match(test_input)
|
||||
assert match is not None, f"Failed to match: {test_input}"
|
||||
|
||||
# Extract name and email from whichever group matched
|
||||
name = match.group('name1') or match.group('name2')
|
||||
email = (
|
||||
match.group('email1') or match.group('email2') or
|
||||
match.group('email3') or match.group('email4')
|
||||
)
|
||||
|
||||
assert name == expected_name, f"Expected name '{expected_name}', got '{name}'"
|
||||
assert email == expected_email, f"Expected email '{expected_email}', got '{email}'"
|
||||
|
||||
@pytest.mark.parametrize("invalid_input", [
|
||||
'"John Doe" <invalid.email>', # invalid email format
|
||||
'John Doe <@example.com>', # missing local part
|
||||
'<user@>', # missing domain
|
||||
'user@domain', # no TLD
|
||||
'"Name" <user @example.com>', # space in email
|
||||
'<.user@example.com>', # starts with dot
|
||||
'user@-example.com', # domain starts with hyphen
|
||||
'Name <user@example.c>', # TLD too short
|
||||
'Name <user@example.toolongdomain>', # TLD too long
|
||||
])
|
||||
def test_invalid_email_format_rejected(
|
||||
self, name_email_basic_pattern: re.Pattern[str], invalid_input: str
|
||||
) -> None:
|
||||
"""Test that inputs with invalid email formats are rejected."""
|
||||
assert not name_email_basic_pattern.match(invalid_input), (
|
||||
f"Incorrectly matched invalid input: {invalid_input}"
|
||||
)
|
||||
|
||||
def test_quoted_name_with_valid_email(
|
||||
self, name_email_basic_pattern: re.Pattern[str]
|
||||
) -> None:
|
||||
"""Test quoted name format with valid email."""
|
||||
match = name_email_basic_pattern.match('"Alice Wonder" <alice@example.com>')
|
||||
assert match is not None
|
||||
assert match.group('name1') == 'Alice Wonder'
|
||||
assert match.group('email1') == 'alice@example.com'
|
||||
|
||||
def test_unquoted_name_with_valid_email(
|
||||
self, name_email_basic_pattern: re.Pattern[str]
|
||||
) -> None:
|
||||
"""Test unquoted name format with valid email."""
|
||||
match = name_email_basic_pattern.match('Bob Builder <bob@example.com>')
|
||||
assert match is not None
|
||||
assert match.group('name2') == 'Bob Builder'
|
||||
assert match.group('email2') == 'bob@example.com'
|
||||
|
||||
def test_email_only_formats(
|
||||
self, name_email_basic_pattern: re.Pattern[str]
|
||||
) -> None:
|
||||
"""Test email-only formats (with and without brackets)."""
|
||||
# With brackets
|
||||
match1 = name_email_basic_pattern.match('<charlie@example.com>')
|
||||
assert match1 is not None
|
||||
assert match1.group('email3') == 'charlie@example.com'
|
||||
|
||||
# Without brackets
|
||||
match2 = name_email_basic_pattern.match('dave@example.com')
|
||||
assert match2 is not None
|
||||
assert match2.group('email4') == 'dave@example.com'
|
||||
|
||||
def test_whitespace_handling(
|
||||
self, name_email_basic_pattern: re.Pattern[str]
|
||||
) -> None:
|
||||
"""Test that leading/trailing whitespace is handled correctly."""
|
||||
match = name_email_basic_pattern.match(' "User" <user@example.com> ')
|
||||
assert match is not None
|
||||
assert match.group('name1') == 'User'
|
||||
assert match.group('email1') == 'user@example.com'
|
||||
|
||||
def test_special_characters_in_local_part(
|
||||
self, name_email_basic_pattern: re.Pattern[str]
|
||||
) -> None:
|
||||
"""Test email with special characters in local part."""
|
||||
match = name_email_basic_pattern.match('Test User <test!#$%&\'*+-/=?^_`{|}~@example.com>')
|
||||
assert match is not None
|
||||
assert match.group('name2') == 'Test User'
|
||||
assert match.group('email2') == 'test!#$%&\'*+-/=?^_`{|}~@example.com'
|
||||
|
||||
|
||||
class TestDomainWithLocalhostRegex:
|
||||
"""Test cases for DOMAIN_WITH_LOCALHOST_REGEX pattern."""
|
||||
|
||||
@pytest.fixture
|
||||
def domain_localhost_pattern(self) -> re.Pattern[str]:
|
||||
"""Fixture that returns compiled domain with localhost regex pattern."""
|
||||
return compile_re(DOMAIN_WITH_LOCALHOST_REGEX)
|
||||
return COMPILED_DOMAIN_WITH_LOCALHOST_REGEX
|
||||
|
||||
@pytest.mark.parametrize("valid_domain", [
|
||||
"localhost",
|
||||
@@ -181,7 +451,7 @@ class TestDomainWithLocalhostPortRegex:
|
||||
@pytest.fixture
|
||||
def domain_localhost_port_pattern(self) -> re.Pattern[str]:
|
||||
"""Fixture that returns compiled domain and localhost with port pattern."""
|
||||
return compile_re(DOMAIN_WITH_LOCALHOST_PORT_REGEX)
|
||||
return COMPILED_DOMAIN_WITH_LOCALHOST_PORT_REGEX
|
||||
|
||||
@pytest.mark.parametrize("valid_domain", [
|
||||
"localhost",
|
||||
@@ -247,7 +517,7 @@ class TestDomainRegex:
|
||||
@pytest.fixture
|
||||
def domain_pattern(self) -> re.Pattern[str]:
|
||||
"""Fixture that returns compiled domain regex pattern."""
|
||||
return compile_re(DOMAIN_REGEX)
|
||||
return COMPILED_DOMAIN_REGEX
|
||||
|
||||
@pytest.mark.parametrize("valid_domain", [
|
||||
"example.com",
|
||||
@@ -306,6 +576,8 @@ class TestRegexPatternConsistency:
|
||||
"""Test that all regex patterns can be compiled without errors."""
|
||||
patterns = [
|
||||
EMAIL_BASIC_REGEX,
|
||||
NAME_EMAIL_SIMPLE_REGEX,
|
||||
NAME_EMAIL_BASIC_REGEX,
|
||||
DOMAIN_WITH_LOCALHOST_REGEX,
|
||||
DOMAIN_WITH_LOCALHOST_PORT_REGEX,
|
||||
DOMAIN_REGEX,
|
||||
@@ -314,9 +586,24 @@ class TestRegexPatternConsistency:
|
||||
compiled = compile_re(pattern)
|
||||
assert isinstance(compiled, re.Pattern)
|
||||
|
||||
def test_compiled_patterns_are_patterns(self) -> None:
|
||||
"""Test that all COMPILED_ constants are Pattern objects."""
|
||||
compiled_patterns = [
|
||||
COMPILED_EMAIL_BASIC_REGEX,
|
||||
COMPILED_NAME_EMAIL_SIMPLE_REGEX,
|
||||
COMPILED_NAME_EMAIL_BASIC_REGEX,
|
||||
COMPILED_DOMAIN_WITH_LOCALHOST_REGEX,
|
||||
COMPILED_DOMAIN_WITH_LOCALHOST_PORT_REGEX,
|
||||
COMPILED_DOMAIN_REGEX,
|
||||
]
|
||||
for pattern in compiled_patterns:
|
||||
assert isinstance(pattern, re.Pattern)
|
||||
|
||||
def test_domain_patterns_are_strings(self) -> None:
|
||||
"""Test that all regex constants are strings."""
|
||||
assert isinstance(EMAIL_BASIC_REGEX, str)
|
||||
assert isinstance(NAME_EMAIL_SIMPLE_REGEX, str)
|
||||
assert isinstance(NAME_EMAIL_BASIC_REGEX, str)
|
||||
assert isinstance(DOMAIN_WITH_LOCALHOST_REGEX, str)
|
||||
assert isinstance(DOMAIN_WITH_LOCALHOST_PORT_REGEX, str)
|
||||
assert isinstance(DOMAIN_REGEX, str)
|
||||
@@ -325,8 +612,8 @@ class TestRegexPatternConsistency:
|
||||
"""Test that domain patterns follow expected hierarchy."""
|
||||
# DOMAIN_WITH_LOCALHOST_PORT_REGEX should accept everything
|
||||
# DOMAIN_WITH_LOCALHOST_REGEX accepts
|
||||
domain_localhost = compile_re(DOMAIN_WITH_LOCALHOST_REGEX)
|
||||
domain_localhost_port = compile_re(DOMAIN_WITH_LOCALHOST_PORT_REGEX)
|
||||
domain_localhost = COMPILED_DOMAIN_WITH_LOCALHOST_REGEX
|
||||
domain_localhost_port = COMPILED_DOMAIN_WITH_LOCALHOST_PORT_REGEX
|
||||
|
||||
test_cases = ["example.com", "subdomain.example.com", "localhost"]
|
||||
for test_case in test_cases:
|
||||
|
||||
@@ -16,7 +16,7 @@ class TestSettingsLoaderInit:
|
||||
|
||||
def test_init_with_valid_config_file(self, tmp_path: Path):
|
||||
"""Test initialization with a valid config file"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[Section]\nkey=value\n")
|
||||
|
||||
loader = SettingsLoader(
|
||||
@@ -35,7 +35,7 @@ class TestSettingsLoaderInit:
|
||||
|
||||
def test_init_with_missing_config_file(self, tmp_path: Path):
|
||||
"""Test initialization with missing config file"""
|
||||
config_file = tmp_path / "missing.ini"
|
||||
config_file = tmp_path.joinpath("missing.ini")
|
||||
|
||||
loader = SettingsLoader(
|
||||
args={},
|
||||
@@ -60,7 +60,7 @@ class TestSettingsLoaderInit:
|
||||
|
||||
def test_init_with_log(self, tmp_path: Path):
|
||||
"""Test initialization with Log object"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[Section]\nkey=value\n")
|
||||
mock_log = Mock(spec=Log)
|
||||
|
||||
@@ -80,7 +80,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_basic(self, tmp_path: Path):
|
||||
"""Test loading basic settings without validation"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nkey1=value1\nkey2=value2\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -90,7 +90,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_with_missing_section(self, tmp_path: Path):
|
||||
"""Test loading settings with missing section"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[OtherSection]\nkey=value\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -100,7 +100,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_allow_not_exist(self, tmp_path: Path):
|
||||
"""Test loading settings with allow_not_exist flag"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[OtherSection]\nkey=value\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -110,7 +110,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_mandatory_field_present(self, tmp_path: Path):
|
||||
"""Test mandatory field validation when field is present"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nrequired_field=value\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -123,7 +123,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_mandatory_field_missing(self, tmp_path: Path):
|
||||
"""Test mandatory field validation when field is missing"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nother_field=value\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -136,7 +136,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_mandatory_field_empty(self, tmp_path: Path):
|
||||
"""Test mandatory field validation when field is empty"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nrequired_field=\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -149,7 +149,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_with_split(self, tmp_path: Path):
|
||||
"""Test splitting values into lists"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nlist_field=a,b,c,d\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -162,7 +162,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_with_custom_split_char(self, tmp_path: Path):
|
||||
"""Test splitting with custom delimiter"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nlist_field=a|b|c|d\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -175,7 +175,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_split_removes_spaces(self, tmp_path: Path):
|
||||
"""Test that split removes spaces from values"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nlist_field=a, b , c , d\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -188,7 +188,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_empty_split_char_fallback(self, tmp_path: Path, capsys: CaptureFixture[str]):
|
||||
"""Test fallback to default split char when empty"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nlist_field=a,b,c\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -201,9 +201,22 @@ class TestLoadSettings:
|
||||
captured = capsys.readouterr()
|
||||
assert "fallback to:" in captured.out
|
||||
|
||||
def test_load_settings_split_empty_value(self, tmp_path: Path):
|
||||
"""Test that split on empty value results in empty list"""
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nlist_field=\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
result = loader.load_settings(
|
||||
"TestSection",
|
||||
{"list_field": ["split:,"]}
|
||||
)
|
||||
|
||||
assert result["list_field"] == []
|
||||
|
||||
def test_load_settings_convert_to_int(self, tmp_path: Path):
|
||||
"""Test converting values to int"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nnumber=123\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -217,7 +230,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_convert_to_float(self, tmp_path: Path):
|
||||
"""Test converting values to float"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nnumber=123.45\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -231,7 +244,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_convert_to_bool_true(self, tmp_path: Path):
|
||||
"""Test converting values to boolean True"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nflag1=true\nflag2=True\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -245,7 +258,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_convert_to_bool_false(self, tmp_path: Path):
|
||||
"""Test converting values to boolean False"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nflag1=false\nflag2=False\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -259,7 +272,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_convert_invalid_type(self, tmp_path: Path):
|
||||
"""Test converting with invalid type raises error"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=test\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -272,7 +285,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_empty_set_to_none(self, tmp_path: Path):
|
||||
"""Test setting empty values to None"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nother=value\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -285,7 +298,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_empty_set_to_custom_value(self, tmp_path: Path):
|
||||
"""Test setting empty values to custom value"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nother=value\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -298,7 +311,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_matching_valid(self, tmp_path: Path):
|
||||
"""Test matching validation with valid value"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nmode=production\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -311,7 +324,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_matching_invalid(self, tmp_path: Path):
|
||||
"""Test matching validation with invalid value"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nmode=invalid\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -324,7 +337,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_in_valid(self, tmp_path: Path):
|
||||
"""Test 'in' validation with valid value"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nallowed=a,b,c\nvalue=b\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -340,7 +353,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_in_invalid(self, tmp_path: Path):
|
||||
"""Test 'in' validation with invalid value"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nallowed=a,b,c\nvalue=d\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -356,7 +369,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_in_missing_target(self, tmp_path: Path):
|
||||
"""Test 'in' validation with missing target"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=a\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -369,7 +382,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_length_exact(self, tmp_path: Path):
|
||||
"""Test length validation with exact match"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=test\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -382,7 +395,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_length_exact_invalid(self, tmp_path: Path):
|
||||
"""Test length validation with exact match failure"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=test\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -395,7 +408,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_length_range(self, tmp_path: Path):
|
||||
"""Test length validation with range"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=testing\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -408,7 +421,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_length_min_only(self, tmp_path: Path):
|
||||
"""Test length validation with minimum only"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=testing\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -421,7 +434,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_length_max_only(self, tmp_path: Path):
|
||||
"""Test length validation with maximum only"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=test\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -434,7 +447,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_range_valid(self, tmp_path: Path):
|
||||
"""Test range validation with valid value"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nnumber=25\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -447,7 +460,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_range_invalid(self, tmp_path: Path):
|
||||
"""Test range validation with invalid value"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nnumber=100\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -460,7 +473,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_check_int_valid(self, tmp_path: Path):
|
||||
"""Test check:int with valid integer"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nnumber=12345\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -473,7 +486,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_check_int_cleanup(self, tmp_path: Path):
|
||||
"""Test check:int with cleanup"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nnumber=12a34b5\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -486,7 +499,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_check_email_valid(self, tmp_path: Path):
|
||||
"""Test check:string.email.basic with valid email"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nemail=test@example.com\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -499,7 +512,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_check_email_invalid(self, tmp_path: Path):
|
||||
"""Test check:string.email.basic with invalid email"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nemail=not-an-email\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -512,7 +525,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_args_override(self, tmp_path: Path, capsys: CaptureFixture[str]):
|
||||
"""Test command line arguments override config values"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=config_value\n")
|
||||
|
||||
loader = SettingsLoader(
|
||||
@@ -530,7 +543,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_args_no_flag(self, tmp_path: Path, capsys: CaptureFixture[str]):
|
||||
"""Test default behavior (no args_override:yes) with list argument that has split"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=a,b,c\n")
|
||||
|
||||
loader = SettingsLoader(
|
||||
@@ -550,7 +563,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_args_list_no_split(self, tmp_path: Path, capsys: CaptureFixture[str]):
|
||||
"""Test that list arguments without split entry are skipped"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=config_value\n")
|
||||
|
||||
loader = SettingsLoader(
|
||||
@@ -570,7 +583,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_args_list_with_split(self, tmp_path: Path, capsys: CaptureFixture[str]):
|
||||
"""Test that list arguments with split entry and args_override:yes are applied"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=a,b,c\n")
|
||||
|
||||
loader = SettingsLoader(
|
||||
@@ -589,7 +602,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_args_no_with_mandatory(self, tmp_path: Path, capsys: CaptureFixture[str]):
|
||||
"""Test default behavior (no args_override:yes) with mandatory field and list args with split"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=config1,config2\n")
|
||||
|
||||
loader = SettingsLoader(
|
||||
@@ -609,7 +622,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_args_no_with_mandatory_valid(self, tmp_path: Path, capsys: CaptureFixture[str]):
|
||||
"""Test default behavior with string args (always overrides due to current logic)"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=config_value\n")
|
||||
|
||||
loader = SettingsLoader(
|
||||
@@ -628,7 +641,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_args_string_no_split(self, tmp_path: Path, capsys: CaptureFixture[str]):
|
||||
"""Test that string arguments with args_override:yes work normally"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=config_value\n")
|
||||
|
||||
loader = SettingsLoader(
|
||||
@@ -647,7 +660,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_no_config_file_with_args(self, tmp_path: Path):
|
||||
"""Test loading settings without config file but with mandatory args"""
|
||||
config_file = tmp_path / "missing.ini"
|
||||
config_file = tmp_path.joinpath("missing.ini")
|
||||
|
||||
loader = SettingsLoader(
|
||||
args={"required": "value"},
|
||||
@@ -662,7 +675,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_no_config_file_missing_args(self, tmp_path: Path):
|
||||
"""Test loading settings without config file and missing args"""
|
||||
config_file = tmp_path / "missing.ini"
|
||||
config_file = tmp_path.joinpath("missing.ini")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
|
||||
@@ -674,7 +687,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_check_list_with_split(self, tmp_path: Path):
|
||||
"""Test check validation with list values"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nlist=abc,def,ghi\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -687,7 +700,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_check_list_cleanup(self, tmp_path: Path):
|
||||
"""Test check validation cleans up list values"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nlist=ab-c,de_f,gh!i\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -700,7 +713,7 @@ class TestLoadSettings:
|
||||
|
||||
def test_load_settings_invalid_check_type(self, tmp_path: Path):
|
||||
"""Test with invalid check type"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text("[TestSection]\nvalue=test\n")
|
||||
|
||||
loader = SettingsLoader(args={}, config_file=config_file)
|
||||
@@ -717,7 +730,7 @@ class TestComplexScenarios:
|
||||
|
||||
def test_complex_validation_scenario(self, tmp_path: Path):
|
||||
"""Test complex scenario with multiple validations"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text(
|
||||
"[Production]\n"
|
||||
"environment=production\n"
|
||||
@@ -758,7 +771,7 @@ class TestComplexScenarios:
|
||||
|
||||
def test_email_list_validation(self, tmp_path: Path):
|
||||
"""Test email list with validation"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text(
|
||||
"[EmailConfig]\n"
|
||||
"emails=test@example.com,admin@domain.org,user+tag@site.co.uk\n"
|
||||
@@ -775,7 +788,7 @@ class TestComplexScenarios:
|
||||
|
||||
def test_mixed_args_and_config(self, tmp_path: Path):
|
||||
"""Test mixing command line args and config file"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text(
|
||||
"[Settings]\n"
|
||||
"value1=config_value1\n"
|
||||
@@ -796,7 +809,7 @@ class TestComplexScenarios:
|
||||
|
||||
def test_multiple_check_types(self, tmp_path: Path):
|
||||
"""Test multiple different check types"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text(
|
||||
"[Checks]\n"
|
||||
"numbers=123,456,789\n"
|
||||
@@ -823,7 +836,7 @@ class TestComplexScenarios:
|
||||
|
||||
def test_args_no_and_list_skip_combination(self, tmp_path: Path, capsys: CaptureFixture[str]):
|
||||
"""Test combination of args_override:yes flag and list argument skip behavior"""
|
||||
config_file = tmp_path / "test.ini"
|
||||
config_file = tmp_path.joinpath("test.ini")
|
||||
config_file.write_text(
|
||||
"[Settings]\n"
|
||||
"no_override=a,b,c\n"
|
||||
|
||||
@@ -4,7 +4,101 @@ tests for corelibs.iterator_handling.fingerprint
|
||||
|
||||
from typing import Any
|
||||
import pytest
|
||||
from corelibs.iterator_handling.fingerprint import dict_hash_frozen, dict_hash_crc
|
||||
from corelibs.iterator_handling.fingerprint import dict_hash_frozen, dict_hash_crc, hash_object
|
||||
|
||||
|
||||
class TestHashObject:
|
||||
"""Tests for hash_object function"""
|
||||
|
||||
def test_hash_object_simple_dict(self):
|
||||
"""Test hashing a simple dictionary with hash_object"""
|
||||
data = {"key1": "value1", "key2": "value2"}
|
||||
result = hash_object(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert len(result) == 64 # SHA256 produces 64 hex characters
|
||||
|
||||
def test_hash_object_mixed_keys(self):
|
||||
"""Test hash_object with mixed int and string keys"""
|
||||
data = {"key1": "value1", 1: "value2", 2: "value3"}
|
||||
result = hash_object(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert len(result) == 64
|
||||
|
||||
def test_hash_object_consistency(self):
|
||||
"""Test that hash_object produces consistent results"""
|
||||
data = {"str_key": "value", 123: "number_key"}
|
||||
hash1 = hash_object(data)
|
||||
hash2 = hash_object(data)
|
||||
|
||||
assert hash1 == hash2
|
||||
|
||||
def test_hash_object_order_independence(self):
|
||||
"""Test that hash_object is order-independent"""
|
||||
data1 = {"a": 1, 1: "one", "b": 2, 2: "two"}
|
||||
data2 = {2: "two", "b": 2, 1: "one", "a": 1}
|
||||
hash1 = hash_object(data1)
|
||||
hash2 = hash_object(data2)
|
||||
|
||||
assert hash1 == hash2
|
||||
|
||||
def test_hash_object_list_of_dicts_mixed_keys(self):
|
||||
"""Test hash_object with list of dicts containing mixed keys"""
|
||||
data = [
|
||||
{"name": "item1", 1: "value1"},
|
||||
{"name": "item2", 2: "value2"}
|
||||
]
|
||||
result = hash_object(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert len(result) == 64
|
||||
|
||||
def test_hash_object_nested_mixed_keys(self):
|
||||
"""Test hash_object with nested structures containing mixed keys"""
|
||||
data = {
|
||||
"outer": {
|
||||
"inner": "value",
|
||||
1: "mixed_key"
|
||||
},
|
||||
2: "another_mixed"
|
||||
}
|
||||
result = hash_object(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert len(result) == 64
|
||||
|
||||
def test_hash_object_different_data(self):
|
||||
"""Test that different data produces different hashes"""
|
||||
data1 = {"key": "value", 1: "one"}
|
||||
data2 = {"key": "value", 2: "two"}
|
||||
hash1 = hash_object(data1)
|
||||
hash2 = hash_object(data2)
|
||||
|
||||
assert hash1 != hash2
|
||||
|
||||
def test_hash_object_complex_nested(self):
|
||||
"""Test hash_object with complex nested structures"""
|
||||
data = {
|
||||
"level1": {
|
||||
"level2": {
|
||||
1: "value",
|
||||
"key": [1, 2, {"nested": "deep", 3: "int_key"}]
|
||||
}
|
||||
}
|
||||
}
|
||||
result = hash_object(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert len(result) == 64
|
||||
|
||||
def test_hash_object_list_with_tuples(self):
|
||||
"""Test hash_object with lists containing tuples"""
|
||||
data = [("a", 1), ("b", 2), {1: "mixed", "key": "value"}]
|
||||
result = hash_object(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert len(result) == 64
|
||||
|
||||
|
||||
class TestDictHashFrozen:
|
||||
@@ -279,6 +373,116 @@ class TestDictHashCrc:
|
||||
assert isinstance(result, str)
|
||||
assert len(result) == 64
|
||||
|
||||
def test_dict_hash_crc_fallback_mixed_keys(self):
|
||||
"""Test dict_hash_crc fallback with mixed int and string keys"""
|
||||
data = {"key1": "value1", 1: "value2", 2: "value3"}
|
||||
result = dict_hash_crc(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
# Fallback prefixes with "HO_"
|
||||
assert result.startswith("HO_")
|
||||
# Hash should be 64 chars + 3 char prefix = 67 total
|
||||
assert len(result) == 67
|
||||
|
||||
def test_dict_hash_crc_fallback_consistency(self):
|
||||
"""Test that fallback produces consistent hashes"""
|
||||
data = {"str_key": "value", 123: "number_key", 456: "another"}
|
||||
hash1 = dict_hash_crc(data)
|
||||
hash2 = dict_hash_crc(data)
|
||||
|
||||
assert hash1 == hash2
|
||||
assert hash1.startswith("HO_")
|
||||
|
||||
def test_dict_hash_crc_fallback_order_independence(self):
|
||||
"""Test that fallback is order-independent for mixed-key dicts"""
|
||||
data1 = {"a": 1, 1: "one", "b": 2, 2: "two"}
|
||||
data2 = {2: "two", "b": 2, 1: "one", "a": 1}
|
||||
hash1 = dict_hash_crc(data1)
|
||||
hash2 = dict_hash_crc(data2)
|
||||
|
||||
assert hash1 == hash2
|
||||
assert hash1.startswith("HO_")
|
||||
|
||||
def test_dict_hash_crc_fallback_list_of_dicts_mixed_keys(self):
|
||||
"""Test fallback with list of dicts containing mixed keys"""
|
||||
data = [
|
||||
{"name": "item1", 1: "value1"},
|
||||
{"name": "item2", 2: "value2"},
|
||||
{3: "value3", "type": "mixed"}
|
||||
]
|
||||
result = dict_hash_crc(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert result.startswith("HO_")
|
||||
assert len(result) == 67
|
||||
|
||||
def test_dict_hash_crc_fallback_nested_mixed_keys(self):
|
||||
"""Test fallback with nested dicts containing mixed keys"""
|
||||
data = {
|
||||
"outer": {
|
||||
"inner": "value",
|
||||
1: "mixed_key"
|
||||
},
|
||||
2: "another_mixed"
|
||||
}
|
||||
result = dict_hash_crc(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert result.startswith("HO_")
|
||||
assert len(result) == 67
|
||||
|
||||
def test_dict_hash_crc_fallback_different_data(self):
|
||||
"""Test that different mixed-key data produces different hashes"""
|
||||
data1 = {"key": "value", 1: "one"}
|
||||
data2 = {"key": "value", 2: "two"}
|
||||
hash1 = dict_hash_crc(data1)
|
||||
hash2 = dict_hash_crc(data2)
|
||||
|
||||
assert hash1 != hash2
|
||||
assert hash1.startswith("HO_")
|
||||
assert hash2.startswith("HO_")
|
||||
|
||||
def test_dict_hash_crc_fallback_complex_structure(self):
|
||||
"""Test fallback with complex nested structure with mixed keys"""
|
||||
data = [
|
||||
{
|
||||
"id": 1,
|
||||
1: "first",
|
||||
"data": {
|
||||
"nested": "value",
|
||||
100: "nested_int_key"
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
2: "second",
|
||||
"items": [1, 2, 3]
|
||||
}
|
||||
]
|
||||
result = dict_hash_crc(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert result.startswith("HO_")
|
||||
assert len(result) == 67
|
||||
|
||||
def test_dict_hash_crc_no_fallback_string_keys_only(self):
|
||||
"""Test that string-only keys don't trigger fallback"""
|
||||
data = {"key1": "value1", "key2": "value2", "key3": "value3"}
|
||||
result = dict_hash_crc(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert not result.startswith("HO_")
|
||||
assert len(result) == 64
|
||||
|
||||
def test_dict_hash_crc_no_fallback_int_keys_only(self):
|
||||
"""Test that int-only keys don't trigger fallback"""
|
||||
data = {1: "one", 2: "two", 3: "three"}
|
||||
result = dict_hash_crc(data)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert not result.startswith("HO_")
|
||||
assert len(result) == 64
|
||||
|
||||
|
||||
class TestComparisonBetweenHashFunctions:
|
||||
"""Tests comparing dict_hash_frozen and dict_hash_crc"""
|
||||
|
||||
@@ -4,7 +4,7 @@ iterator_handling.list_helepr tests
|
||||
|
||||
from typing import Any
|
||||
import pytest
|
||||
from corelibs.iterator_handling.list_helpers import convert_to_list, is_list_in_list
|
||||
from corelibs.iterator_handling.list_helpers import convert_to_list, is_list_in_list, make_unique_list_of_dicts
|
||||
|
||||
|
||||
class TestConvertToList:
|
||||
@@ -298,3 +298,225 @@ class TestPerformance:
|
||||
# Should still work correctly despite duplicates
|
||||
assert set(result) == {1, 3}
|
||||
assert isinstance(result, list)
|
||||
|
||||
|
||||
class TestMakeUniqueListOfDicts:
|
||||
"""Test cases for make_unique_list_of_dicts function"""
|
||||
|
||||
def test_basic_duplicate_removal(self):
|
||||
"""Test basic removal of duplicate dictionaries"""
|
||||
dict_list = [
|
||||
{"a": 1, "b": 2},
|
||||
{"a": 1, "b": 2},
|
||||
{"a": 3, "b": 4}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 2
|
||||
assert {"a": 1, "b": 2} in result
|
||||
assert {"a": 3, "b": 4} in result
|
||||
|
||||
def test_order_independent_duplicates(self):
|
||||
"""Test that dictionaries with different key orders are treated as duplicates"""
|
||||
dict_list = [
|
||||
{"a": 1, "b": 2},
|
||||
{"b": 2, "a": 1}, # Same content, different order
|
||||
{"a": 3, "b": 4}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 2
|
||||
assert {"a": 1, "b": 2} in result
|
||||
assert {"a": 3, "b": 4} in result
|
||||
|
||||
def test_empty_list(self):
|
||||
"""Test with empty list"""
|
||||
result = make_unique_list_of_dicts([])
|
||||
assert result == []
|
||||
assert isinstance(result, list)
|
||||
|
||||
def test_single_dict(self):
|
||||
"""Test with single dictionary"""
|
||||
dict_list = [{"a": 1, "b": 2}]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert result == [{"a": 1, "b": 2}]
|
||||
|
||||
def test_all_unique(self):
|
||||
"""Test when all dictionaries are unique"""
|
||||
dict_list = [
|
||||
{"a": 1},
|
||||
{"b": 2},
|
||||
{"c": 3},
|
||||
{"d": 4}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 4
|
||||
for d in dict_list:
|
||||
assert d in result
|
||||
|
||||
def test_all_duplicates(self):
|
||||
"""Test when all dictionaries are duplicates"""
|
||||
dict_list = [
|
||||
{"a": 1, "b": 2},
|
||||
{"a": 1, "b": 2},
|
||||
{"a": 1, "b": 2},
|
||||
{"b": 2, "a": 1}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 1
|
||||
assert result[0] == {"a": 1, "b": 2}
|
||||
|
||||
def test_nested_values(self):
|
||||
"""Test with nested structures as values"""
|
||||
dict_list = [
|
||||
{"a": [1, 2], "b": 3},
|
||||
{"a": [1, 2], "b": 3},
|
||||
{"a": [1, 3], "b": 3}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 2
|
||||
assert {"a": [1, 2], "b": 3} in result
|
||||
assert {"a": [1, 3], "b": 3} in result
|
||||
|
||||
def test_different_value_types(self):
|
||||
"""Test with different value types"""
|
||||
dict_list = [
|
||||
{"str": "hello", "int": 42, "float": 3.14, "bool": True},
|
||||
{"str": "hello", "int": 42, "float": 3.14, "bool": True},
|
||||
{"str": "world", "int": 99, "float": 2.71, "bool": False}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 2
|
||||
|
||||
def test_empty_dicts(self):
|
||||
"""Test with empty dictionaries"""
|
||||
dict_list: list[Any] = [
|
||||
{},
|
||||
{},
|
||||
{"a": 1}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 2
|
||||
assert {} in result
|
||||
assert {"a": 1} in result
|
||||
|
||||
def test_single_key_dicts(self):
|
||||
"""Test with single key dictionaries"""
|
||||
dict_list = [
|
||||
{"a": 1},
|
||||
{"a": 1},
|
||||
{"a": 2},
|
||||
{"b": 1}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 3
|
||||
assert {"a": 1} in result
|
||||
assert {"a": 2} in result
|
||||
assert {"b": 1} in result
|
||||
|
||||
def test_many_keys(self):
|
||||
"""Test with dictionaries containing many keys"""
|
||||
dict1 = {"a": 1, "b": 2, "c": 3, "d": 4, "e": 5}
|
||||
dict2 = {"e": 5, "d": 4, "c": 3, "b": 2, "a": 1} # Same, different order
|
||||
dict3 = {"a": 1, "b": 2, "c": 3, "d": 4, "e": 6} # Different value
|
||||
dict_list = [dict1, dict2, dict3]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 2
|
||||
|
||||
def test_numeric_keys(self):
|
||||
"""Test with numeric keys"""
|
||||
dict_list = [
|
||||
{1: "one", 2: "two"},
|
||||
{2: "two", 1: "one"},
|
||||
{1: "one", 2: "three"}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 2
|
||||
|
||||
def test_none_values(self):
|
||||
"""Test with None values"""
|
||||
dict_list = [
|
||||
{"a": None, "b": 2},
|
||||
{"a": None, "b": 2},
|
||||
{"a": 1, "b": None}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 2
|
||||
assert {"a": None, "b": 2} in result
|
||||
assert {"a": 1, "b": None} in result
|
||||
|
||||
def test_mixed_key_types(self):
|
||||
"""Test with mixed key types (string and numeric)"""
|
||||
dict_list = [
|
||||
{"a": 1, 1: "one"},
|
||||
{1: "one", "a": 1},
|
||||
{"a": 2, 1: "one"}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 2
|
||||
|
||||
@pytest.mark.parametrize("dict_list,expected_length", [
|
||||
([{"a": 1}, {"a": 1}, {"a": 1}], 1),
|
||||
([{"a": 1}, {"a": 2}, {"a": 3}], 3),
|
||||
([{"a": 1, "b": 2}, {"b": 2, "a": 1}], 1),
|
||||
([{}, {}], 1),
|
||||
([{"x": [1, 2]}, {"x": [1, 2]}], 1),
|
||||
([{"a": 1}, {"b": 2}, {"c": 3}], 3),
|
||||
]) # pyright: ignore[reportUnknownArgumentType]
|
||||
def test_parametrized_unique_dicts(self, dict_list: list[Any], expected_length: int):
|
||||
"""Test make_unique_list_of_dicts with various input combinations"""
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == expected_length
|
||||
assert isinstance(result, list)
|
||||
|
||||
def test_large_list(self):
|
||||
"""Test with a large list of dictionaries"""
|
||||
dict_list = [{"id": i % 100, "value": f"val_{i % 100}"} for i in range(1000)]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
# Should have 100 unique dicts (0-99)
|
||||
assert len(result) == 100
|
||||
|
||||
def test_preserves_last_occurrence(self):
|
||||
"""Test behavior with duplicate entries"""
|
||||
# The function uses dict comprehension, which keeps last occurrence
|
||||
dict_list = [
|
||||
{"a": 1, "b": 2},
|
||||
{"a": 3, "b": 4},
|
||||
{"a": 1, "b": 2}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 2
|
||||
# Just verify correct unique count, order may vary
|
||||
|
||||
def test_nested_dicts(self):
|
||||
"""Test with nested dictionaries"""
|
||||
dict_list = [
|
||||
{"outer": {"inner": 1}},
|
||||
{"outer": {"inner": 1}},
|
||||
{"outer": {"inner": 2}}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 2
|
||||
|
||||
def test_string_values_case_sensitive(self):
|
||||
"""Test that string values are case-sensitive"""
|
||||
dict_list = [
|
||||
{"name": "John"},
|
||||
{"name": "john"},
|
||||
{"name": "JOHN"},
|
||||
{"name": "John"}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 3
|
||||
|
||||
def test_boolean_values(self):
|
||||
"""Test with boolean values"""
|
||||
dict_list = [
|
||||
{"flag": True, "count": 1},
|
||||
{"count": 1, "flag": True},
|
||||
{"flag": False, "count": 1}
|
||||
]
|
||||
result = make_unique_list_of_dicts(dict_list)
|
||||
assert len(result) == 2
|
||||
assert {"flag": True, "count": 1} in result
|
||||
assert {"flag": False, "count": 1} in result
|
||||
|
||||
# __END__
|
||||
|
||||
@@ -28,6 +28,7 @@ def tmp_log_path(tmp_path: Path) -> Path:
|
||||
@pytest.fixture
|
||||
def basic_log_settings() -> LogSettings:
|
||||
"""Basic log settings for testing"""
|
||||
# Return a new dict each time to avoid state pollution
|
||||
return {
|
||||
"log_level_console": LoggingLevel.WARNING,
|
||||
"log_level_file": LoggingLevel.DEBUG,
|
||||
@@ -308,4 +309,54 @@ class TestUpdateConsoleFormatter:
|
||||
# Verify message was logged
|
||||
assert "Test warning message" in caplog.text
|
||||
|
||||
def test_log_console_format_option_set_to_none(
|
||||
self, tmp_log_path: Path
|
||||
):
|
||||
"""Test that when log_console_format option is set to None, it uses ConsoleFormatSettings.ALL"""
|
||||
# Save the original DEFAULT_LOG_SETTINGS to restore it after test
|
||||
original_default = Log.DEFAULT_LOG_SETTINGS.copy()
|
||||
|
||||
try:
|
||||
# Reset DEFAULT_LOG_SETTINGS to ensure clean state
|
||||
Log.DEFAULT_LOG_SETTINGS = {
|
||||
"log_level_console": Log.DEFAULT_LOG_LEVEL_CONSOLE,
|
||||
"log_level_file": Log.DEFAULT_LOG_LEVEL_FILE,
|
||||
"per_run_log": False,
|
||||
"console_enabled": True,
|
||||
"console_color_output_enabled": True,
|
||||
"console_format_type": ConsoleFormatSettings.ALL,
|
||||
"add_start_info": True,
|
||||
"add_end_info": False,
|
||||
"log_queue": None,
|
||||
}
|
||||
|
||||
# Create a fresh settings dict with console_format_type explicitly set to None
|
||||
settings: LogSettings = {
|
||||
"log_level_console": LoggingLevel.WARNING,
|
||||
"log_level_file": LoggingLevel.DEBUG,
|
||||
"per_run_log": False,
|
||||
"console_enabled": True,
|
||||
"console_color_output_enabled": False,
|
||||
"console_format_type": None, # type: ignore
|
||||
"add_start_info": False,
|
||||
"add_end_info": False,
|
||||
"log_queue": None,
|
||||
}
|
||||
|
||||
# Verify that None is explicitly set in the input
|
||||
assert settings['console_format_type'] is None
|
||||
|
||||
log = Log(
|
||||
log_path=tmp_log_path,
|
||||
log_name="test_log",
|
||||
log_settings=settings
|
||||
)
|
||||
|
||||
# Verify that None was replaced with ConsoleFormatSettings.ALL
|
||||
# The Log class should replace None with the default value (ALL)
|
||||
assert log.log_settings['console_format_type'] == ConsoleFormatSettings.ALL
|
||||
finally:
|
||||
# Restore original DEFAULT_LOG_SETTINGS
|
||||
Log.DEFAULT_LOG_SETTINGS = original_default
|
||||
|
||||
# __END__
|
||||
|
||||
@@ -2,11 +2,10 @@
|
||||
PyTest: requests_handling/caller
|
||||
"""
|
||||
|
||||
from typing import Any
|
||||
from unittest.mock import Mock, patch
|
||||
import pytest
|
||||
import requests
|
||||
from corelibs.requests_handling.caller import Caller
|
||||
from corelibs.requests_handling.caller import Caller, ErrorResponse, ProxyConfig
|
||||
|
||||
|
||||
class TestCallerInit:
|
||||
@@ -21,13 +20,17 @@ class TestCallerInit:
|
||||
assert caller.timeout == 20
|
||||
assert caller.verify is True
|
||||
assert caller.proxy is None
|
||||
assert caller.cafile is None
|
||||
assert caller.ca_file is None
|
||||
|
||||
def test_init_with_all_params(self):
|
||||
"""Test Caller initialization with all parameters"""
|
||||
header = {"Authorization": "Bearer token", "Content-Type": "application/json"}
|
||||
proxy = {"http": "http://proxy.example.com:8080", "https": "https://proxy.example.com:8080"}
|
||||
caller = Caller(header=header, verify=False, timeout=30, proxy=proxy)
|
||||
proxy: ProxyConfig = {
|
||||
"type": "socks5",
|
||||
"host": "proxy.example.com:8080",
|
||||
"port": "8080"
|
||||
}
|
||||
caller = Caller(header=header, timeout=30, proxy=proxy, verify=False)
|
||||
|
||||
assert caller.headers == header
|
||||
assert caller.timeout == 30
|
||||
@@ -58,7 +61,7 @@ class TestCallerInit:
|
||||
ca_file_path = "/path/to/ca/cert.pem"
|
||||
caller = Caller(header={}, ca_file=ca_file_path)
|
||||
|
||||
assert caller.cafile == ca_file_path
|
||||
assert caller.ca_file == ca_file_path
|
||||
|
||||
|
||||
class TestCallerGet:
|
||||
@@ -81,7 +84,8 @@ class TestCallerGet:
|
||||
headers={"Authorization": "Bearer token"},
|
||||
timeout=20,
|
||||
verify=True,
|
||||
proxies=None
|
||||
proxies=None,
|
||||
cert=None
|
||||
)
|
||||
|
||||
@patch('corelibs.requests_handling.caller.requests.get')
|
||||
@@ -101,7 +105,8 @@ class TestCallerGet:
|
||||
headers={},
|
||||
timeout=20,
|
||||
verify=True,
|
||||
proxies=None
|
||||
proxies=None,
|
||||
cert=None
|
||||
)
|
||||
|
||||
@patch('corelibs.requests_handling.caller.requests.get')
|
||||
@@ -134,7 +139,11 @@ class TestCallerGet:
|
||||
mock_response = Mock(spec=requests.Response)
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
proxy = {"http": "http://proxy.example.com:8080"}
|
||||
proxy: ProxyConfig = {
|
||||
"type": "socks5",
|
||||
"host": "proxy.example.com:8080",
|
||||
"port": "8080"
|
||||
}
|
||||
caller = Caller(header={}, proxy=proxy)
|
||||
caller.get("https://api.example.com/data")
|
||||
|
||||
@@ -142,40 +151,46 @@ class TestCallerGet:
|
||||
assert mock_get.call_args[1]["proxies"] == proxy
|
||||
|
||||
@patch('corelibs.requests_handling.caller.requests.get')
|
||||
def test_get_invalid_schema_returns_none(self, mock_get: Mock, capsys: Any):
|
||||
"""Test GET request with invalid URL schema returns None"""
|
||||
def test_get_invalid_schema_returns_none(self, mock_get: Mock):
|
||||
"""Test GET request with invalid URL schema returns ErrorResponse"""
|
||||
mock_get.side_effect = requests.exceptions.InvalidSchema("Invalid URL")
|
||||
|
||||
caller = Caller(header={})
|
||||
response = caller.get("invalid://example.com")
|
||||
|
||||
assert response is None
|
||||
captured = capsys.readouterr()
|
||||
assert "Invalid URL during 'get'" in captured.out
|
||||
assert isinstance(response, ErrorResponse)
|
||||
assert response.code == 200
|
||||
assert "Invalid URL during 'get'" in response.message
|
||||
assert response.action == "get"
|
||||
assert response.url == "invalid://example.com"
|
||||
|
||||
@patch('corelibs.requests_handling.caller.requests.get')
|
||||
def test_get_timeout_returns_none(self, mock_get: Mock, capsys: Any):
|
||||
"""Test GET request timeout returns None"""
|
||||
def test_get_timeout_returns_none(self, mock_get: Mock):
|
||||
"""Test GET request timeout returns ErrorResponse"""
|
||||
mock_get.side_effect = requests.exceptions.ReadTimeout("Timeout")
|
||||
|
||||
caller = Caller(header={})
|
||||
response = caller.get("https://api.example.com/data")
|
||||
|
||||
assert response is None
|
||||
captured = capsys.readouterr()
|
||||
assert "Timeout (20s) during 'get'" in captured.out
|
||||
assert isinstance(response, ErrorResponse)
|
||||
assert response.code == 300
|
||||
assert "Timeout (20s) during 'get'" in response.message
|
||||
assert response.action == "get"
|
||||
assert response.url == "https://api.example.com/data"
|
||||
|
||||
@patch('corelibs.requests_handling.caller.requests.get')
|
||||
def test_get_connection_error_returns_none(self, mock_get: Mock, capsys: Any):
|
||||
"""Test GET request connection error returns None"""
|
||||
def test_get_connection_error_returns_none(self, mock_get: Mock):
|
||||
"""Test GET request connection error returns ErrorResponse"""
|
||||
mock_get.side_effect = requests.exceptions.ConnectionError("Connection failed")
|
||||
|
||||
caller = Caller(header={})
|
||||
response = caller.get("https://api.example.com/data")
|
||||
|
||||
assert response is None
|
||||
captured = capsys.readouterr()
|
||||
assert "Connection error during 'get'" in captured.out
|
||||
assert isinstance(response, ErrorResponse)
|
||||
assert response.code == 400
|
||||
assert "Connection error during 'get'" in response.message
|
||||
assert response.action == "get"
|
||||
assert response.url == "https://api.example.com/data"
|
||||
|
||||
|
||||
class TestCallerPost:
|
||||
@@ -200,7 +215,8 @@ class TestCallerPost:
|
||||
headers={"Content-Type": "application/json"},
|
||||
timeout=20,
|
||||
verify=True,
|
||||
proxies=None
|
||||
proxies=None,
|
||||
cert=None
|
||||
)
|
||||
|
||||
@patch('corelibs.requests_handling.caller.requests.post')
|
||||
@@ -234,40 +250,46 @@ class TestCallerPost:
|
||||
assert mock_post.call_args[1]["json"] == data
|
||||
|
||||
@patch('corelibs.requests_handling.caller.requests.post')
|
||||
def test_post_invalid_schema_returns_none(self, mock_post: Mock, capsys: Any):
|
||||
"""Test POST request with invalid URL schema returns None"""
|
||||
def test_post_invalid_schema_returns_none(self, mock_post: Mock):
|
||||
"""Test POST request with invalid URL schema returns ErrorResponse"""
|
||||
mock_post.side_effect = requests.exceptions.InvalidSchema("Invalid URL")
|
||||
|
||||
caller = Caller(header={})
|
||||
response = caller.post("invalid://example.com", data={"test": "data"})
|
||||
|
||||
assert response is None
|
||||
captured = capsys.readouterr()
|
||||
assert "Invalid URL during 'post'" in captured.out
|
||||
assert isinstance(response, ErrorResponse)
|
||||
assert response.code == 200
|
||||
assert "Invalid URL during 'post'" in response.message
|
||||
assert response.action == "post"
|
||||
assert response.url == "invalid://example.com"
|
||||
|
||||
@patch('corelibs.requests_handling.caller.requests.post')
|
||||
def test_post_timeout_returns_none(self, mock_post: Mock, capsys: Any):
|
||||
"""Test POST request timeout returns None"""
|
||||
def test_post_timeout_returns_none(self, mock_post: Mock):
|
||||
"""Test POST request timeout returns ErrorResponse"""
|
||||
mock_post.side_effect = requests.exceptions.ReadTimeout("Timeout")
|
||||
|
||||
caller = Caller(header={})
|
||||
response = caller.post("https://api.example.com/data", data={"test": "data"})
|
||||
|
||||
assert response is None
|
||||
captured = capsys.readouterr()
|
||||
assert "Timeout (20s) during 'post'" in captured.out
|
||||
assert isinstance(response, ErrorResponse)
|
||||
assert response.code == 300
|
||||
assert "Timeout (20s) during 'post'" in response.message
|
||||
assert response.action == "post"
|
||||
assert response.url == "https://api.example.com/data"
|
||||
|
||||
@patch('corelibs.requests_handling.caller.requests.post')
|
||||
def test_post_connection_error_returns_none(self, mock_post: Mock, capsys: Any):
|
||||
"""Test POST request connection error returns None"""
|
||||
def test_post_connection_error_returns_none(self, mock_post: Mock):
|
||||
"""Test POST request connection error returns ErrorResponse"""
|
||||
mock_post.side_effect = requests.exceptions.ConnectionError("Connection failed")
|
||||
|
||||
caller = Caller(header={})
|
||||
response = caller.post("https://api.example.com/data", data={"test": "data"})
|
||||
|
||||
assert response is None
|
||||
captured = capsys.readouterr()
|
||||
assert "Connection error during 'post'" in captured.out
|
||||
assert isinstance(response, ErrorResponse)
|
||||
assert response.code == 400
|
||||
assert "Connection error during 'post'" in response.message
|
||||
assert response.action == "post"
|
||||
assert response.url == "https://api.example.com/data"
|
||||
|
||||
|
||||
class TestCallerPut:
|
||||
@@ -292,7 +314,8 @@ class TestCallerPut:
|
||||
headers={"Content-Type": "application/json"},
|
||||
timeout=20,
|
||||
verify=True,
|
||||
proxies=None
|
||||
proxies=None,
|
||||
cert=None
|
||||
)
|
||||
|
||||
@patch('corelibs.requests_handling.caller.requests.put')
|
||||
@@ -311,16 +334,18 @@ class TestCallerPut:
|
||||
assert mock_put.call_args[1]["params"] == params
|
||||
|
||||
@patch('corelibs.requests_handling.caller.requests.put')
|
||||
def test_put_timeout_returns_none(self, mock_put: Mock, capsys: Any):
|
||||
"""Test PUT request timeout returns None"""
|
||||
def test_put_timeout_returns_none(self, mock_put: Mock):
|
||||
"""Test PUT request timeout returns ErrorResponse"""
|
||||
mock_put.side_effect = requests.exceptions.ReadTimeout("Timeout")
|
||||
|
||||
caller = Caller(header={})
|
||||
response = caller.put("https://api.example.com/data/1", data={"test": "data"})
|
||||
|
||||
assert response is None
|
||||
captured = capsys.readouterr()
|
||||
assert "Timeout (20s) during 'put'" in captured.out
|
||||
assert isinstance(response, ErrorResponse)
|
||||
assert response.code == 300
|
||||
assert "Timeout (20s) during 'put'" in response.message
|
||||
assert response.action == "put"
|
||||
assert response.url == "https://api.example.com/data/1"
|
||||
|
||||
|
||||
class TestCallerPatch:
|
||||
@@ -345,7 +370,8 @@ class TestCallerPatch:
|
||||
headers={"Content-Type": "application/json"},
|
||||
timeout=20,
|
||||
verify=True,
|
||||
proxies=None
|
||||
proxies=None,
|
||||
cert=None
|
||||
)
|
||||
|
||||
@patch('corelibs.requests_handling.caller.requests.patch')
|
||||
@@ -364,16 +390,18 @@ class TestCallerPatch:
|
||||
assert mock_patch.call_args[1]["params"] == params
|
||||
|
||||
@patch('corelibs.requests_handling.caller.requests.patch')
|
||||
def test_patch_connection_error_returns_none(self, mock_patch: Mock, capsys: Any):
|
||||
"""Test PATCH request connection error returns None"""
|
||||
def test_patch_connection_error_returns_none(self, mock_patch: Mock):
|
||||
"""Test PATCH request connection error returns ErrorResponse"""
|
||||
mock_patch.side_effect = requests.exceptions.ConnectionError("Connection failed")
|
||||
|
||||
caller = Caller(header={})
|
||||
response = caller.patch("https://api.example.com/data/1", data={"test": "data"})
|
||||
|
||||
assert response is None
|
||||
captured = capsys.readouterr()
|
||||
assert "Connection error during 'patch'" in captured.out
|
||||
assert isinstance(response, ErrorResponse)
|
||||
assert response.code == 400
|
||||
assert "Connection error during 'patch'" in response.message
|
||||
assert response.action == "patch"
|
||||
assert response.url == "https://api.example.com/data/1"
|
||||
|
||||
|
||||
class TestCallerDelete:
|
||||
@@ -396,7 +424,8 @@ class TestCallerDelete:
|
||||
headers={"Authorization": "Bearer token"},
|
||||
timeout=20,
|
||||
verify=True,
|
||||
proxies=None
|
||||
proxies=None,
|
||||
cert=None
|
||||
)
|
||||
|
||||
@patch('corelibs.requests_handling.caller.requests.delete')
|
||||
@@ -414,16 +443,18 @@ class TestCallerDelete:
|
||||
assert mock_delete.call_args[1]["params"] == params
|
||||
|
||||
@patch('corelibs.requests_handling.caller.requests.delete')
|
||||
def test_delete_invalid_schema_returns_none(self, mock_delete: Mock, capsys: Any):
|
||||
"""Test DELETE request with invalid URL schema returns None"""
|
||||
def test_delete_invalid_schema_returns_none(self, mock_delete: Mock):
|
||||
"""Test DELETE request with invalid URL schema returns ErrorResponse"""
|
||||
mock_delete.side_effect = requests.exceptions.InvalidSchema("Invalid URL")
|
||||
|
||||
caller = Caller(header={})
|
||||
response = caller.delete("invalid://example.com/data/1")
|
||||
|
||||
assert response is None
|
||||
captured = capsys.readouterr()
|
||||
assert "Invalid URL during 'delete'" in captured.out
|
||||
assert isinstance(response, ErrorResponse)
|
||||
assert response.code == 200
|
||||
assert "Invalid URL during 'delete'" in response.message
|
||||
assert response.action == "delete"
|
||||
assert response.url == "invalid://example.com/data/1"
|
||||
|
||||
|
||||
class TestCallerParametrized:
|
||||
@@ -492,7 +523,7 @@ class TestCallerParametrized:
|
||||
])
|
||||
@patch('corelibs.requests_handling.caller.requests.get')
|
||||
def test_exception_handling(
|
||||
self, mock_get: Mock, exception_class: type, expected_message: str, capsys: Any
|
||||
self, mock_get: Mock, exception_class: type, expected_message: str
|
||||
):
|
||||
"""Test exception handling for all exception types"""
|
||||
mock_get.side_effect = exception_class("Test error")
|
||||
@@ -500,9 +531,8 @@ class TestCallerParametrized:
|
||||
caller = Caller(header={})
|
||||
response = caller.get("https://api.example.com/data")
|
||||
|
||||
assert response is None
|
||||
captured = capsys.readouterr()
|
||||
assert expected_message in captured.out
|
||||
assert isinstance(response, ErrorResponse)
|
||||
assert expected_message in response.message
|
||||
|
||||
|
||||
class TestCallerIntegration:
|
||||
@@ -599,7 +629,8 @@ class TestCallerEdgeCases:
|
||||
headers={},
|
||||
timeout=20,
|
||||
verify=True,
|
||||
proxies=None
|
||||
proxies=None,
|
||||
cert=None
|
||||
)
|
||||
|
||||
@patch('corelibs.requests_handling.caller.requests.post')
|
||||
@@ -659,7 +690,8 @@ class TestCallerEdgeCases:
|
||||
headers={},
|
||||
timeout=20,
|
||||
verify=True,
|
||||
proxies=None
|
||||
proxies=None,
|
||||
cert=None
|
||||
)
|
||||
|
||||
@patch('corelibs.requests_handling.caller.requests.get')
|
||||
@@ -679,7 +711,8 @@ class TestCallerEdgeCases:
|
||||
headers={},
|
||||
timeout=20,
|
||||
verify=True,
|
||||
proxies=None
|
||||
proxies=None,
|
||||
cert=None
|
||||
)
|
||||
|
||||
def test_timeout_zero(self):
|
||||
@@ -730,9 +763,10 @@ class TestCallerProxyHandling:
|
||||
mock_response = Mock(spec=requests.Response)
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
proxy = {
|
||||
"http": "http://proxy.example.com:8080",
|
||||
"https": "https://proxy.example.com:8080"
|
||||
proxy: ProxyConfig = {
|
||||
"type": "socks5",
|
||||
"host": "proxy.example.com:8080",
|
||||
"port": "8080"
|
||||
}
|
||||
caller = Caller(header={}, proxy=proxy)
|
||||
caller.get("https://api.example.com/data")
|
||||
@@ -746,9 +780,10 @@ class TestCallerProxyHandling:
|
||||
mock_response = Mock(spec=requests.Response)
|
||||
mock_post.return_value = mock_response
|
||||
|
||||
proxy = {
|
||||
"http": "http://user:pass@proxy.example.com:8080",
|
||||
"https": "https://user:pass@proxy.example.com:8080"
|
||||
proxy: ProxyConfig = {
|
||||
"type": "socks5",
|
||||
"host": "proxy.example.com:8080",
|
||||
"port": "8080"
|
||||
}
|
||||
caller = Caller(header={}, proxy=proxy)
|
||||
caller.post("https://api.example.com/data", data={"test": "data"})
|
||||
@@ -789,7 +824,7 @@ class TestCallerResponseHandling:
|
||||
caller = Caller(header={})
|
||||
response = caller.get("https://api.example.com/data")
|
||||
|
||||
assert response is not None
|
||||
assert not isinstance(response, ErrorResponse)
|
||||
assert response.status_code == 200
|
||||
assert response.text == "Success"
|
||||
assert response.json() == {"status": "ok"}
|
||||
@@ -805,7 +840,7 @@ class TestCallerResponseHandling:
|
||||
caller = Caller(header={})
|
||||
response = caller.get("https://api.example.com/data")
|
||||
|
||||
assert response is not None
|
||||
assert not isinstance(response, ErrorResponse)
|
||||
assert response.status_code == status_code
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user