Compare commits

...

24 Commits

Author SHA1 Message Date
Clemens Schwaighofer
5319a059ad Update the caller class
- has now ErrorResponse return values instead of None on errors
- changed parameter cafile to ca_file and its position in the init method
- Proxy has ProxyConfig Typed Dict format

Tests updates to reflect those changes
2026-01-30 18:17:41 +09:00
Clemens Schwaighofer
163b8c4018 Update caller Class, backport from github manage script 2026-01-30 17:32:30 +09:00
Clemens Schwaighofer
6322b95068 v0.47.0: fingerprint update with fallback for str/int index overlaps 2026-01-27 17:15:32 +09:00
Clemens Schwaighofer
715ed1f9c2 Docblocks update in in iterator handling fingerprint 2026-01-27 17:14:31 +09:00
Clemens Schwaighofer
82a759dd21 Fix fingerprint with mixed int and str keys
Create a fallback hash function to handle mixed key types in dictionaries
and lists, ensuring consistent hashing across different data structures.

Fallback called is prefixed with "HO_" to indicate its usage.
2026-01-27 15:59:38 +09:00
Clemens Schwaighofer
fe913608c4 Fix iteration list helpers dict list type 2026-01-27 14:52:11 +09:00
Clemens Schwaighofer
79f9c5d1c6 iterator list helpers tests run cases updated 2026-01-27 14:51:25 +09:00
Clemens Schwaighofer
3d091129e2 v0.46.0: Add unique list helper function 2026-01-27 14:43:35 +09:00
Clemens Schwaighofer
1a978f786d Add a list helper to create unique list of dictionaries and tests for it. 2026-01-27 14:42:19 +09:00
Clemens Schwaighofer
51669d3c5f Settings loader test-run add boolean convert check test 2026-01-23 18:07:52 +09:00
Clemens Schwaighofer
d128dcb479 v0.45.1: Fix Log with log console format set to None 2026-01-23 15:16:38 +09:00
Clemens Schwaighofer
84286593f6 Log fix bug where log consosle format set to None would throw an exception
Also add prefix "[SettingsLoader] " to print statements in SettingsLoader if we do not write to log
2026-01-23 15:14:31 +09:00
Clemens Schwaighofer
8d97f09e5e v0.45.0: Log add function to get console formatter flags set 2026-01-23 11:37:02 +09:00
Clemens Schwaighofer
2748bc19be Log, add get console formatter method
Returns current flags set for console formatter
2026-01-23 11:33:38 +09:00
Clemens Schwaighofer
0b3c8fc774 v0.44.2: Move the compiled regex into dedicated file 2026-01-09 16:17:27 +09:00
Clemens Schwaighofer
7da18e0f00 Moved the compiled regex patterns to a new file regex_constants_compiled
So we do not force the compiled build if not needed
2026-01-09 16:15:38 +09:00
Clemens Schwaighofer
49e38081ad v0.44.1: add pre compiled regexes 2026-01-08 15:16:26 +09:00
Clemens Schwaighofer
a14f993a31 Add pre-compiled REGEX entries to the regex pattern file
compiled ones hare prefixed with COMPILED_
2026-01-08 15:14:48 +09:00
Clemens Schwaighofer
ae938f9909 v0.44.0: Add more REGEX patters for email matching 2026-01-08 14:59:49 +09:00
Clemens Schwaighofer
f91e0bb93a Add new regex constants for email handling and update related tests 2026-01-08 14:58:14 +09:00
Clemens Schwaighofer
d3f61005cf v0.43.4: Fix for config loader with empty to split into lists values 2026-01-06 10:04:03 +09:00
Clemens Schwaighofer
2923a3e88b Fix settings loader to return empty list when splitting empty string value 2026-01-06 09:58:21 +09:00
Clemens Schwaighofer
a73ced0067 v0.43.3: settings loader raise exception and log message text split 2025-12-24 10:25:42 +09:00
Clemens Schwaighofer
f89b91fe7f Settings loader different log string to value error raise string 2025-12-24 10:23:27 +09:00
21 changed files with 1520 additions and 236 deletions

View File

@@ -1,7 +1,7 @@
# MARK: Project info # MARK: Project info
[project] [project]
name = "corelibs" name = "corelibs"
version = "0.43.2" version = "0.47.0"
description = "Collection of utils for Python scripts" description = "Collection of utils for Python scripts"
readme = "README.md" readme = "README.md"
requires-python = ">=3.13" requires-python = ">=3.13"

View File

@@ -19,9 +19,26 @@ def compile_re(reg: str) -> re.Pattern[str]:
# email regex # email regex
EMAIL_BASIC_REGEX: str = r""" SUB_EMAIL_BASIC_REGEX: str = r"""
^[A-Za-z0-9!#$%&'*+\-\/=?^_`{|}~][A-Za-z0-9!#$%:\(\)&'*+\-\/=?^_`{|}~\.]{0,63} [A-Za-z0-9!#$%&'*+\-\/=?^_`{|}~][A-Za-z0-9!#$%:\(\)&'*+\-\/=?^_`{|}~\.]{0,63}
@(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[a-zA-Z]{2,6}$ @(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[a-zA-Z]{2,6}
"""
EMAIL_BASIC_REGEX = rf"^{SUB_EMAIL_BASIC_REGEX}$"
# name + email regex for email sending type like "foo bar" <email@mail.com>
NAME_EMAIL_SIMPLE_REGEX = r"""
^\s*(?:"(?P<name1>[^"]+)"\s*<(?P<email1>[^>]+)>|
(?P<name2>.+?)\s*<(?P<email2>[^>]+)>|
<(?P<email3>[^>]+)>|
(?P<email4>[^\s<>]+))\s*$
"""
# name + email with the basic regex set
NAME_EMAIL_BASIC_REGEX = rf"""
^\s*(?:
"(?P<name1>[^"]+)"\s*<(?P<email1>{SUB_EMAIL_BASIC_REGEX})>|
(?P<name2>.+?)\s*<(?P<email2>{SUB_EMAIL_BASIC_REGEX})>|
<(?P<email3>{SUB_EMAIL_BASIC_REGEX})>|
(?P<email4>{SUB_EMAIL_BASIC_REGEX})
)\s*$
""" """
# Domain regex with localhost # Domain regex with localhost
DOMAIN_WITH_LOCALHOST_REGEX: str = r""" DOMAIN_WITH_LOCALHOST_REGEX: str = r"""

View File

@@ -0,0 +1,23 @@
"""
List of regex compiled strings that can be used
"""
from corelibs.check_handling.regex_constants import (
compile_re,
EMAIL_BASIC_REGEX,
NAME_EMAIL_SIMPLE_REGEX,
NAME_EMAIL_BASIC_REGEX,
DOMAIN_WITH_LOCALHOST_REGEX,
DOMAIN_WITH_LOCALHOST_PORT_REGEX,
DOMAIN_REGEX
)
# all above in compiled form
COMPILED_EMAIL_BASIC_REGEX = compile_re(EMAIL_BASIC_REGEX)
COMPILED_NAME_EMAIL_SIMPLE_REGEX = compile_re(NAME_EMAIL_SIMPLE_REGEX)
COMPILED_NAME_EMAIL_BASIC_REGEX = compile_re(NAME_EMAIL_BASIC_REGEX)
COMPILED_DOMAIN_WITH_LOCALHOST_REGEX = compile_re(DOMAIN_WITH_LOCALHOST_REGEX)
COMPILED_DOMAIN_WITH_LOCALHOST_PORT_REGEX = compile_re(DOMAIN_WITH_LOCALHOST_PORT_REGEX)
COMPILED_DOMAIN_REGEX = compile_re(DOMAIN_REGEX)
# __END__

View File

@@ -173,10 +173,13 @@ class SettingsLoader:
args_overrride.append(key) args_overrride.append(key)
if skip: if skip:
continue continue
settings[config_id][key] = [ if settings[config_id][key]:
__value.replace(" ", "") settings[config_id][key] = [
for __value in settings[config_id][key].split(split_char) __value.replace(" ", "")
] for __value in settings[config_id][key].split(split_char)
]
else:
settings[config_id][key] = []
except KeyError as e: except KeyError as e:
raise ValueError(self.__print( raise ValueError(self.__print(
f"[!] Cannot read [{config_id}] block because the entry [{e}] could not be found", f"[!] Cannot read [{config_id}] block because the entry [{e}] could not be found",
@@ -278,11 +281,9 @@ class SettingsLoader:
error = True error = True
self.__print(f"[!] Missing content entry for: {entry}", 'ERROR') self.__print(f"[!] Missing content entry for: {entry}", 'ERROR')
if error is True: if error is True:
self.__print("[!] Missing or incorrect settings data. Cannot proceed", 'CRITICAL')
raise ValueError( raise ValueError(
self.__print( "Missing or incorrect settings data. Cannot proceed: " + "; ".join(self.__error_msg)
"[!] Missing or incorrect settings data. Cannot proceed: " + "; ".join(self.__error_msg),
'CRITICAL'
)
) )
# set empty # set empty
for [entry, empty_set] in entry_set_empty.items(): for [entry, empty_set] in entry_set_empty.items():
@@ -576,7 +577,7 @@ class SettingsLoader:
self.log.logger.log(Log.get_log_level_int(level), msg, stacklevel=2) self.log.logger.log(Log.get_log_level_int(level), msg, stacklevel=2)
if self.log is None or self.always_print: if self.log is None or self.always_print:
if print_error: if print_error:
print(msg) print(f"[SettingsLoader] {msg}")
if level == 'ERROR': if level == 'ERROR':
# remove any prefix [!] for error message list # remove any prefix [!] for error message list
self.__error_msg.append(msg.replace('[!] ', '').strip()) self.__error_msg.append(msg.replace('[!] ', '').strip())

View File

@@ -4,6 +4,8 @@ Send email wrapper
import smtplib import smtplib
from email.message import EmailMessage from email.message import EmailMessage
from email.header import Header
from email.utils import formataddr, parseaddr
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
if TYPE_CHECKING: if TYPE_CHECKING:
from corelibs.logging_handling.log import Logger from corelibs.logging_handling.log import Logger
@@ -133,21 +135,30 @@ class SendEmail:
_subject = template["subject"] _subject = template["subject"]
_body = template["body"] _body = template["body"]
for key, value in replace.items(): for key, value in replace.items():
_subject = _subject.replace(f"{{{{{key}}}}}", value) placeholder = f"{{{{{key}}}}}"
_body = _body.replace(f"{{{{{key}}}}}", value) _subject = _subject.replace(placeholder, value)
_body = _body.replace(placeholder, value)
name, addr = parseaddr(from_email)
if name:
# Encode the name part with MIME encoding
encoded_name = str(Header(name, 'utf-8'))
from_email_encoded = formataddr((encoded_name, addr))
else:
from_email_encoded = from_email
# create a simple email and add subhect, from email # create a simple email and add subhect, from email
msg_email = EmailMessage() msg_email = EmailMessage()
# msg.set_content(_body, charset='utf-8', cte='quoted-printable') # msg.set_content(_body, charset='utf-8', cte='quoted-printable')
msg_email.set_content(_body, charset="utf-8") msg_email.set_content(_body, charset="utf-8")
msg_email["Subject"] = _subject msg_email["Subject"] = _subject
msg_email["From"] = from_email msg_email["From"] = from_email_encoded
# push to array for sening # push to array for sening
msg.append(msg_email) msg.append(msg_email)
return msg return msg
def send_email_list( def send_email_list(
self, self,
email: list[EmailMessage], receivers: list[str], emails: list[EmailMessage],
receivers: list[str],
combined_send: bool | None = None, combined_send: bool | None = None,
test_only: bool | None = None test_only: bool | None = None
): ):
@@ -170,18 +181,27 @@ class SendEmail:
smtp = smtplib.SMTP(smtp_host) smtp = smtplib.SMTP(smtp_host)
except ConnectionRefusedError as e: except ConnectionRefusedError as e:
self.log.error("Could not open SMTP connection to: %s, %s", smtp_host, e) self.log.error("Could not open SMTP connection to: %s, %s", smtp_host, e)
# prepare receiver list
receivers_encoded: list[str] = []
for __receiver in receivers:
to_name, to_addr = parseaddr(__receiver)
if to_name:
# Encode the name part with MIME encoding
encoded_to_name = str(Header(to_name, 'utf-8'))
receivers_encoded.append(formataddr((encoded_to_name, to_addr)))
else:
receivers_encoded.append(__receiver)
# loop over messages and then over recievers # loop over messages and then over recievers
for msg in email: for msg in emails:
if combined_send is True: if combined_send is True:
msg["To"] = ", ".join(receivers) msg["To"] = ", ".join(receivers_encoded)
if not self.settings.get('test'): if not self.settings.get('test'):
if smtp is not None: if smtp is not None:
smtp.send_message(msg, msg["From"], receivers) smtp.send_message(msg, msg["From"], receivers_encoded)
else: else:
self.log.info(f"[EMAIL] Test, not sending email\n{msg}") self.log.info(f"[EMAIL] Test, not sending email\n{msg}")
else: else:
for receiver in receivers: for receiver in receivers_encoded:
# send to
self.log.debug(f"===> Send to: {receiver}") self.log.debug(f"===> Send to: {receiver}")
if "To" in msg: if "To" in msg:
msg.replace_header("To", receiver) msg.replace_header("To", receiver)

View File

@@ -4,11 +4,38 @@ Various dictionary, object and list hashers
import json import json
import hashlib import hashlib
from typing import Any from typing import Any, cast, Sequence
def hash_object(obj: Any) -> str:
"""
RECOMMENDED for new use
Create a hash for any dict or list with mixed key types
Arguments:
obj {Any} -- _description_
Returns:
str -- _description_
"""
def normalize(o: Any) -> Any:
if isinstance(o, dict):
# Sort by repr of keys to handle mixed types (str, int, etc.)
o = cast(dict[Any, Any], o)
return tuple(sorted((repr(k), normalize(v)) for k, v in o.items()))
if isinstance(o, (list, tuple)):
o = cast(Sequence[Any], o)
return tuple(normalize(item) for item in o)
return repr(o)
normalized = normalize(obj)
return hashlib.sha256(str(normalized).encode()).hexdigest()
def dict_hash_frozen(data: dict[Any, Any]) -> int: def dict_hash_frozen(data: dict[Any, Any]) -> int:
""" """
NOT RECOMMENDED, use dict_hash_crc or hash_object instead
If used, DO NOT CHANGE
hash a dict via freeze hash a dict via freeze
Args: Args:
@@ -22,18 +49,25 @@ def dict_hash_frozen(data: dict[Any, Any]) -> int:
def dict_hash_crc(data: dict[Any, Any] | list[Any]) -> str: def dict_hash_crc(data: dict[Any, Any] | list[Any]) -> str:
""" """
Create a sha256 hash over dict LEGACY METHOD, must be kept for fallback, if used by other code, DO NOT CHANGE
Create a sha256 hash over dict or list
alternative for alternative for
dict_hash_frozen dict_hash_frozen
Args: Args:
data (dict | list): _description_ data (dict[Any, Any] | list[Any]): _description_
Returns: Returns:
str: _description_ str: sha256 hash, prefiex with HO_ if fallback used
""" """
return hashlib.sha256( try:
json.dumps(data, sort_keys=True, ensure_ascii=True).encode('utf-8') return hashlib.sha256(
).hexdigest() # IT IS IMPORTANT THAT THE BELOW CALL STAYS THE SAME AND DOES NOT CHANGE OR WE WILL GET DIFFERENT HASHES
# separators=(',', ':') to get rid of spaces, but if this is used the hash will be different, DO NOT ADD
json.dumps(data, sort_keys=True, ensure_ascii=True, default=str).encode('utf-8')
).hexdigest()
except TypeError:
# Fallback tod different hasher, will return DIFFERENT hash than above, so only usable in int/str key mixes
return "HO_" + hash_object(data)
# __END__ # __END__

View File

@@ -2,6 +2,7 @@
List type helpers List type helpers
""" """
import json
from typing import Any, Sequence from typing import Any, Sequence
@@ -44,4 +45,31 @@ def is_list_in_list(
# Get the difference and extract just the values # Get the difference and extract just the values
return [item for item, _ in set_a - set_b] return [item for item, _ in set_a - set_b]
def make_unique_list_of_dicts(dict_list: list[Any]) -> list[Any]:
"""
Create a list of unique dictionary entries
Arguments:
dict_list {list[Any]} -- _description_
Returns:
list[Any] -- _description_
"""
try:
# try json dumps, can fail with int and str index types
return list(
{
json.dumps(d, sort_keys=True, ensure_ascii=True, separators=(',', ':')): d
for d in dict_list
}.values()
)
except TypeError:
# Fallback for non-serializable entries, slow but works
unique: list[Any] = []
for d in dict_list:
if d not in unique:
unique.append(d)
return unique
# __END__ # __END__

View File

@@ -602,9 +602,9 @@ class Log(LogParent):
__setting = self.DEFAULT_LOG_SETTINGS.get(__log_entry, True) __setting = self.DEFAULT_LOG_SETTINGS.get(__log_entry, True)
default_log_settings[__log_entry] = __setting default_log_settings[__log_entry] = __setting
# check console log type # check console log type
default_log_settings['console_format_type'] = cast('ConsoleFormat', log_settings.get( if (console_format_type := log_settings.get('console_format_type')) is None:
'console_format_type', self.DEFAULT_LOG_SETTINGS['console_format_type'] console_format_type = self.DEFAULT_LOG_SETTINGS['console_format_type']
)) default_log_settings['console_format_type'] = cast('ConsoleFormat', console_format_type)
# check log queue # check log queue
__setting = log_settings.get('log_queue', self.DEFAULT_LOG_SETTINGS['log_queue']) __setting = log_settings.get('log_queue', self.DEFAULT_LOG_SETTINGS['log_queue'])
if __setting is not None: if __setting is not None:
@@ -774,6 +774,16 @@ class Log(LogParent):
self.__set_console_formatter(console_format_type) self.__set_console_formatter(console_format_type)
) )
def get_console_formatter(self) -> ConsoleFormat:
"""
Get the current console formatter, this the settings type
Note that if eg "ALL" is set it will return the combined information but not the ALL flag name itself
Returns:
ConsoleFormat -- _description_
"""
return self.log_settings['console_format_type']
# MARK: console handler # MARK: console handler
def __create_console_handler( def __create_console_handler(
self, handler_name: str, self, handler_name: str,

View File

@@ -3,32 +3,61 @@ requests lib interface
V2 call type V2 call type
""" """
from typing import Any from typing import Any, TypedDict, cast
import warnings
import requests import requests
# to hide the verfiy warnings because of the bad SSL settings from Netskope, Akamai, etc from requests import exceptions
warnings.filterwarnings('ignore', message='Unverified HTTPS request')
class ErrorResponse:
"""
Error response structure. This is returned if a request could not be completed
"""
def __init__(
self,
code: int,
message: str,
action: str,
url: str,
exception: exceptions.InvalidSchema | exceptions.ReadTimeout | exceptions.ConnectionError | None = None
) -> None:
self.code = code
self.message = message
self.action = action
self.url = url
self.exception_name = type(exception).__name__ if exception is not None else None
self.exception_trace = exception if exception is not None else None
class ProxyConfig(TypedDict):
"""
Socks proxy settings
"""
type: str
host: str
port: str
class Caller: class Caller:
"""_summary_""" """
requests lib interface
"""
def __init__( def __init__(
self, self,
header: dict[str, str], header: dict[str, str],
verify: bool = True,
timeout: int = 20, timeout: int = 20,
proxy: dict[str, str] | None = None, proxy: ProxyConfig | None = None,
verify: bool = True,
ca_file: str | None = None ca_file: str | None = None
): ):
self.headers = header self.headers = header
self.timeout: int = timeout self.timeout: int = timeout
self.cafile = ca_file self.ca_file = ca_file
self.verify = verify self.verify = verify
self.proxy = proxy self.proxy = cast(dict[str, str], proxy) if proxy is not None else None
def __timeout(self, timeout: int | None) -> int: def __timeout(self, timeout: int | None) -> int:
if timeout is not None: if timeout is not None and timeout >= 0:
return timeout return timeout
return self.timeout return self.timeout
@@ -39,7 +68,7 @@ class Caller:
data: dict[str, Any] | None = None, data: dict[str, Any] | None = None,
params: dict[str, Any] | None = None, params: dict[str, Any] | None = None,
timeout: int | None = None timeout: int | None = None
) -> requests.Response | None: ) -> requests.Response | ErrorResponse:
""" """
call wrapper, on error returns None call wrapper, on error returns None
@@ -56,67 +85,96 @@ class Caller:
if data is None: if data is None:
data = {} data = {}
try: try:
response = None
if action == "get": if action == "get":
response = requests.get( return requests.get(
url, url,
params=params, params=params,
headers=self.headers, headers=self.headers,
timeout=self.__timeout(timeout), timeout=self.__timeout(timeout),
verify=self.verify, verify=self.verify,
proxies=self.proxy proxies=self.proxy,
cert=self.ca_file
) )
elif action == "post": if action == "post":
response = requests.post( return requests.post(
url, url,
params=params, params=params,
json=data, json=data,
headers=self.headers, headers=self.headers,
timeout=self.__timeout(timeout), timeout=self.__timeout(timeout),
verify=self.verify, verify=self.verify,
proxies=self.proxy proxies=self.proxy,
cert=self.ca_file
) )
elif action == "put": if action == "put":
response = requests.put( return requests.put(
url, url,
params=params, params=params,
json=data, json=data,
headers=self.headers, headers=self.headers,
timeout=self.__timeout(timeout), timeout=self.__timeout(timeout),
verify=self.verify, verify=self.verify,
proxies=self.proxy proxies=self.proxy,
cert=self.ca_file
) )
elif action == "patch": if action == "patch":
response = requests.patch( return requests.patch(
url, url,
params=params, params=params,
json=data, json=data,
headers=self.headers, headers=self.headers,
timeout=self.__timeout(timeout), timeout=self.__timeout(timeout),
verify=self.verify, verify=self.verify,
proxies=self.proxy proxies=self.proxy,
cert=self.ca_file
) )
elif action == "delete": if action == "delete":
response = requests.delete( return requests.delete(
url, url,
params=params, params=params,
headers=self.headers, headers=self.headers,
timeout=self.__timeout(timeout), timeout=self.__timeout(timeout),
verify=self.verify, verify=self.verify,
proxies=self.proxy proxies=self.proxy,
cert=self.ca_file
) )
return response return ErrorResponse(
except requests.exceptions.InvalidSchema as e: 100,
print(f"Invalid URL during '{action}' for {url}:\n\t{e}") f"Unsupported action '{action}'",
return None action,
except requests.exceptions.ReadTimeout as e: url
print(f"Timeout ({self.timeout}s) during '{action}' for {url}:\n\t{e}") )
return None except exceptions.InvalidSchema as e:
except requests.exceptions.ConnectionError as e: return ErrorResponse(
print(f"Connection error during '{action}' for {url}:\n\t{e}") 200,
return None f"Invalid URL during '{action}' for {url}",
action,
url,
e
)
except exceptions.ReadTimeout as e:
return ErrorResponse(
300,
f"Timeout ({self.timeout}s) during '{action}' for {url}",
action,
url,
e
)
except exceptions.ConnectionError as e:
return ErrorResponse(
400,
f"Connection error during '{action}' for {url}",
action,
url,
e
)
def get(self, url: str, params: dict[str, Any] | None = None) -> requests.Response | None: def get(
self,
url: str,
params: dict[str, Any] | None = None,
timeout: int | None = None
) -> requests.Response | ErrorResponse:
""" """
get data get data
@@ -127,11 +185,15 @@ class Caller:
Returns: Returns:
requests.Response: _description_ requests.Response: _description_
""" """
return self.__call('get', url, params=params) return self.__call('get', url, params=params, timeout=timeout)
def post( def post(
self, url: str, data: dict[str, Any] | None = None, params: dict[str, Any] | None = None self,
) -> requests.Response | None: url: str,
data: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
timeout: int | None = None
) -> requests.Response | ErrorResponse:
""" """
post data post data
@@ -143,11 +205,15 @@ class Caller:
Returns: Returns:
requests.Response | None: _description_ requests.Response | None: _description_
""" """
return self.__call('post', url, data, params) return self.__call('post', url, data, params, timeout=timeout)
def put( def put(
self, url: str, data: dict[str, Any] | None = None, params: dict[str, Any] | None = None self,
) -> requests.Response | None: url: str,
data: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
timeout: int | None = None
) -> requests.Response | ErrorResponse:
"""_summary_ """_summary_
Args: Args:
@@ -158,11 +224,15 @@ class Caller:
Returns: Returns:
requests.Response | None: _description_ requests.Response | None: _description_
""" """
return self.__call('put', url, data, params) return self.__call('put', url, data, params, timeout=timeout)
def patch( def patch(
self, url: str, data: dict[str, Any] | None = None, params: dict[str, Any] | None = None self,
) -> requests.Response | None: url: str,
data: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
timeout: int | None = None
) -> requests.Response | ErrorResponse:
"""_summary_ """_summary_
Args: Args:
@@ -173,9 +243,14 @@ class Caller:
Returns: Returns:
requests.Response | None: _description_ requests.Response | None: _description_
""" """
return self.__call('patch', url, data, params) return self.__call('patch', url, data, params, timeout=timeout)
def delete(self, url: str, params: dict[str, Any] | None = None) -> requests.Response | None: def delete(
self,
url: str,
params: dict[str, Any] | None = None,
timeout: int | None = None
) -> requests.Response | ErrorResponse:
""" """
delete delete
@@ -186,6 +261,6 @@ class Caller:
Returns: Returns:
requests.Response | None: _description_ requests.Response | None: _description_
""" """
return self.__call('delete', url, params=params) return self.__call('delete', url, params=params, timeout=timeout)
# __END__ # __END__

View File

@@ -2,14 +2,28 @@
Test check andling for regex checks Test check andling for regex checks
""" """
import re from corelibs_text_colors.text_colors import Colors
from corelibs.check_handling.regex_constants import DOMAIN_WITH_LOCALHOST_REGEX from corelibs.check_handling.regex_constants import (
compile_re, DOMAIN_WITH_LOCALHOST_REGEX, EMAIL_BASIC_REGEX, NAME_EMAIL_BASIC_REGEX, SUB_EMAIL_BASIC_REGEX
)
from corelibs.check_handling.regex_constants_compiled import (
COMPILED_DOMAIN_WITH_LOCALHOST_REGEX, COMPILED_EMAIL_BASIC_REGEX,
COMPILED_NAME_EMAIL_SIMPLE_REGEX, COMPILED_NAME_EMAIL_BASIC_REGEX
)
NAME_EMAIL_SIMPLE_REGEX = r"""
^\s*(?:"(?P<name1>[^"]+)"\s*<(?P<email1>[^>]+)>|
(?P<name2>.+?)\s*<(?P<email2>[^>]+)>|
<(?P<email3>[^>]+)>|
(?P<email4>[^\s<>]+))\s*$
"""
def main(): def domain_test():
""" """
Test regex checks domain regex test
""" """
print("=" * 30)
test_domains = [ test_domains = [
"example.com", "example.com",
"localhost", "localhost",
@@ -18,7 +32,7 @@ def main():
"some-domain.org" "some-domain.org"
] ]
regex_domain_check = re.compile(DOMAIN_WITH_LOCALHOST_REGEX) regex_domain_check = COMPILED_DOMAIN_WITH_LOCALHOST_REGEX
print(f"REGEX: {DOMAIN_WITH_LOCALHOST_REGEX}") print(f"REGEX: {DOMAIN_WITH_LOCALHOST_REGEX}")
print(f"Check regex: {regex_domain_check.search('localhost')}") print(f"Check regex: {regex_domain_check.search('localhost')}")
@@ -29,6 +43,66 @@ def main():
print(f"Did not match: {domain}") print(f"Did not match: {domain}")
def email_test():
"""
email regex test
"""
print("=" * 30)
email_list = """
e@bar.com
<f@foobar.com>
"Master" <foobar@bar.com>
"not valid" not@valid.com
also not valid not@valid.com
some header <something@bar.com>
test master <master@master.com>
日本語 <japan@jp.net>
"ひほん カケ苦" <foo@bar.com>
single@entry.com
arsch@popsch.com
test open <open@open.com>
"""
print(f"REGEX: SUB_EMAIL_BASIC_REGEX: {SUB_EMAIL_BASIC_REGEX}")
print(f"REGEX: EMAIL_BASIC_REGEX: {EMAIL_BASIC_REGEX}")
print(f"REGEX: COMPILED_NAME_EMAIL_SIMPLE_REGEX: {COMPILED_NAME_EMAIL_SIMPLE_REGEX}")
print(f"REGEX: NAME_EMAIL_BASIC_REGEX: {NAME_EMAIL_BASIC_REGEX}")
basic_email = COMPILED_EMAIL_BASIC_REGEX
sub_basic_email = compile_re(SUB_EMAIL_BASIC_REGEX)
simple_name_email_regex = COMPILED_NAME_EMAIL_SIMPLE_REGEX
full_name_email_regex = COMPILED_NAME_EMAIL_BASIC_REGEX
for email in email_list.splitlines():
email = email.strip()
if not email:
continue
print(f">>> Testing: {email}")
if not basic_email.match(email):
print(f"{Colors.red}[EMAIL ] No match: {email}{Colors.reset}")
else:
print(f"{Colors.green}[EMAIL ] Matched : {email}{Colors.reset}")
if not sub_basic_email.match(email):
print(f"{Colors.red}[SUB ] No match: {email}{Colors.reset}")
else:
print(f"{Colors.green}[SUB ] Matched : {email}{Colors.reset}")
if not simple_name_email_regex.match(email):
print(f"{Colors.red}[SIMPLE] No match: {email}{Colors.reset}")
else:
print(f"{Colors.green}[SIMPLE] Matched : {email}{Colors.reset}")
if not full_name_email_regex.match(email):
print(f"{Colors.red}[FULL ] No match: {email}{Colors.reset}")
else:
print(f"{Colors.green}[FULL ] Matched : {email}{Colors.reset}")
def main():
"""
Test regex checks
"""
domain_test()
email_test()
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -12,10 +12,12 @@ some_match_list=foo,bar
test_list=a,b,c,d f, g h test_list=a,b,c,d f, g h
other_list=a|b|c|d| other_list=a|b|c|d|
third_list=xy|ab|df|fg third_list=xy|ab|df|fg
empty_list=
str_length=foobar str_length=foobar
int_range=20 int_range=20
int_range_not_set= int_range_not_set=
int_range_not_set_empty_set=5 int_range_not_set_empty_set=5
bool_var=True
# #
match_target=foo match_target=foo
match_target_list=foo,bar,baz match_target_list=foo,bar,baz

View File

@@ -21,11 +21,6 @@ def main():
Main run Main run
""" """
value = "2025/1/1"
regex_c = re.compile(SettingsLoaderCheck.CHECK_SETTINGS['string.date']['regex'], re.VERBOSE)
result = regex_c.search(value)
print(f"regex {regex_c} check against {value} -> {result}")
# for log testing # for log testing
log = Log( log = Log(
log_path=ROOT_PATH.joinpath(LOG_DIR, 'settings_loader.log'), log_path=ROOT_PATH.joinpath(LOG_DIR, 'settings_loader.log'),
@@ -37,6 +32,11 @@ def main():
) )
log.logger.info('Settings loader') log.logger.info('Settings loader')
value = "2025/1/1"
regex_c = re.compile(SettingsLoaderCheck.CHECK_SETTINGS['string.date']['regex'], re.VERBOSE)
result = regex_c.search(value)
log.info(f"regex {regex_c} check against {value} -> {result}")
sl = SettingsLoader( sl = SettingsLoader(
{ {
'overload_from_args': 'OVERLOAD from ARGS', 'overload_from_args': 'OVERLOAD from ARGS',
@@ -69,6 +69,9 @@ def main():
"split:|", "split:|",
"check:string.alphanumeric" "check:string.alphanumeric"
], ],
"empty_list": [
"split:,",
],
"str_length": [ "str_length": [
"length:2-10" "length:2-10"
], ],
@@ -81,6 +84,7 @@ def main():
"int_range_not_set_empty_set": [ "int_range_not_set_empty_set": [
"empty:" "empty:"
], ],
"bool_var": ["convert:bool"],
"match_target": ["matching:foo"], "match_target": ["matching:foo"],
"match_target_list": ["split:,", "matching:foo|bar|baz",], "match_target_list": ["split:,", "matching:foo|bar|baz",],
"match_source_a": ["in:match_target"], "match_source_a": ["in:match_target"],

View File

@@ -24,12 +24,19 @@ def main() -> None:
"lookup_value_c": "B02", "lookup_value_c": "B02",
"replace_value": "R02", "replace_value": "R02",
}, },
{
"lookup_value_p": "A03",
"lookup_value_c": "B03",
"replace_value": "R03",
},
] ]
test_foo = ArraySearchList( test_foo = ArraySearchList(
key = "lookup_value_p", key="lookup_value_p",
value = "A01" value="A01"
) )
print(test_foo) result = find_in_array_from_list(data, [test_foo])
print(f"Search A: {dump_data(test_foo)} -> {dump_data(result)}")
search: list[ArraySearchList] = [ search: list[ArraySearchList] = [
{ {
"key": "lookup_value_p", "key": "lookup_value_p",
@@ -38,12 +45,122 @@ def main() -> None:
{ {
"key": "lookup_value_c", "key": "lookup_value_c",
"value": "B01" "value": "B01"
},
]
result = find_in_array_from_list(data, search)
print(f"Search B: {dump_data(search)} -> {dump_data(result)}")
search: list[ArraySearchList] = [
{
"key": "lookup_value_p",
"value": "A01"
},
{
"key": "lookup_value_c",
"value": "B01"
},
{
"key": "lookup_value_c",
"value": "B02"
},
]
try:
result = find_in_array_from_list(data, search)
print(f"Search C: {dump_data(search)} -> {dump_data(result)}")
except KeyError as e:
print(f"Search C raised KeyError: {e}")
search: list[ArraySearchList] = [
{
"key": "lookup_value_p",
"value": "A01"
},
{
"key": "lookup_value_c",
"value": ["B01", "B02"]
},
]
try:
result = find_in_array_from_list(data, search)
print(f"Search D: {dump_data(search)} -> {dump_data(result)}")
except KeyError as e:
print(f"Search D raised KeyError: {e}")
search: list[ArraySearchList] = [
{
"key": "lookup_value_p",
"value": ["A01", "A03"]
},
{
"key": "lookup_value_c",
"value": ["B01", "B02"]
},
]
try:
result = find_in_array_from_list(data, search)
print(f"Search E: {dump_data(search)} -> {dump_data(result)}")
except KeyError as e:
print(f"Search E raised KeyError: {e}")
search: list[ArraySearchList] = [
{
"key": "lookup_value_p",
"value": "NOT FOUND"
},
]
try:
result = find_in_array_from_list(data, search)
print(f"Search F: {dump_data(search)} -> {dump_data(result)}")
except KeyError as e:
print(f"Search F raised KeyError: {e}")
data = [
{
"sd_user_id": "1593",
"email": "",
"employee_id": ""
},
{
"sd_user_id": "1592",
"email": "",
"employee_id": ""
},
{
"sd_user_id": "1596",
"email": "",
"employee_id": ""
},
{
"sd_user_id": "1594",
"email": "",
"employee_id": ""
},
{
"sd_user_id": "1595",
"email": "",
"employee_id": ""
},
{
"sd_user_id": "1861",
"email": "",
"employee_id": ""
},
{
"sd_user_id": "1862",
"email": "",
"employee_id": ""
},
{
"sd_user_id": "1860",
"email": "",
"employee_id": ""
} }
] ]
result = find_in_array_from_list(data, [ArraySearchList(
result = find_in_array_from_list(data, search) key="sd_user_id",
value="1593"
print(f"Search {dump_data(search)} -> {dump_data(result)}") )])
print(f"Search F: -> {dump_data(result)}")
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -2,7 +2,10 @@
test list helpers test list helpers
""" """
from corelibs.iterator_handling.list_helpers import is_list_in_list, convert_to_list from typing import Any
from corelibs.debug_handling.dump_data import dump_data
from corelibs.iterator_handling.list_helpers import is_list_in_list, convert_to_list, make_unique_list_of_dicts
from corelibs.iterator_handling.fingerprint import dict_hash_crc
def __test_is_list_in_list_a(): def __test_is_list_in_list_a():
@@ -18,9 +21,66 @@ def __convert_list():
print(f"IN: {source} -> {result}") print(f"IN: {source} -> {result}")
def __make_unique_list_of_dicts():
dict_list = [
{"a": 1, "b": 2, "nested": {"x": 10, "y": 20}},
{"a": 1, "b": 2, "nested": {"x": 10, "y": 20}},
{"b": 2, "a": 1, "nested": {"y": 20, "x": 10}},
{"b": 2, "a": 1, "nested": {"y": 20, "x": 30}},
{"a": 3, "b": 4, "nested": {"x": 30, "y": 40}}
]
unique_dicts = make_unique_list_of_dicts(dict_list)
dhf = dict_hash_crc(unique_dicts)
print(f"Unique dicts: {dump_data(unique_dicts)} [{dhf}]")
dict_list = [
{"a": 1, 1: "one"},
{1: "one", "a": 1},
{"a": 2, 1: "one"}
]
unique_dicts = make_unique_list_of_dicts(dict_list)
dhf = dict_hash_crc(unique_dicts)
print(f"Unique dicts: {dump_data(unique_dicts)} [{dhf}]")
dict_list = [
{"a": 1, "b": [1, 2, 3]},
{"b": [1, 2, 3], "a": 1},
{"a": 1, "b": [1, 2, 4]},
1, 2, "String", 1, "Foobar"
]
unique_dicts = make_unique_list_of_dicts(dict_list)
dhf = dict_hash_crc(unique_dicts)
print(f"Unique dicts: {dump_data(unique_dicts)} [{dhf}]")
dict_list: list[Any] = [
[],
{},
[],
{},
{"a": []},
{"a": []},
{"a": {}},
{"a": {}},
]
unique_dicts = make_unique_list_of_dicts(dict_list)
dhf = dict_hash_crc(unique_dicts)
print(f"Unique dicts: {dump_data(unique_dicts)} [{dhf}]")
dict_list: list[Any] = [
(1, 2),
(1, 2),
(2, 3),
]
unique_dicts = make_unique_list_of_dicts(dict_list)
dhf = dict_hash_crc(unique_dicts)
print(f"Unique dicts: {dump_data(unique_dicts)} [{dhf}]")
def main(): def main():
"""List helpers test runner"""
__test_is_list_in_list_a() __test_is_list_in_list_a()
__convert_list() __convert_list()
__make_unique_list_of_dicts()
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -27,7 +27,8 @@ def main():
"per_run_log": True, "per_run_log": True,
# "console_format_type": ConsoleFormatSettings.NONE, # "console_format_type": ConsoleFormatSettings.NONE,
# "console_format_type": ConsoleFormatSettings.MINIMAL, # "console_format_type": ConsoleFormatSettings.MINIMAL,
"console_format_type": ConsoleFormat.TIME_MICROSECONDS | ConsoleFormat.NAME | ConsoleFormat.LEVEL, # "console_format_type": ConsoleFormat.TIME_MICROSECONDS | ConsoleFormat.NAME | ConsoleFormat.LEVEL,
"console_format_type": None,
# "console_format_type": ConsoleFormat.NAME, # "console_format_type": ConsoleFormat.NAME,
# "console_format_type": ( # "console_format_type": (
# ConsoleFormat.TIME | ConsoleFormat.TIMEZONE | ConsoleFormat.LINENO | ConsoleFormat.LEVEL # ConsoleFormat.TIME | ConsoleFormat.TIMEZONE | ConsoleFormat.LINENO | ConsoleFormat.LEVEL
@@ -121,10 +122,16 @@ def main():
log.set_log_level(Log.CONSOLE_HANDLER, LoggingLevel.DEBUG) log.set_log_level(Log.CONSOLE_HANDLER, LoggingLevel.DEBUG)
log.debug('Current logging format: %s', log.log_settings['console_format_type']) log.debug('Current logging format: %s', log.log_settings['console_format_type'])
log.debug('Current console formatter: %s', log.get_console_formatter())
log.update_console_formatter(ConsoleFormat.TIME | ConsoleFormat.LINENO) log.update_console_formatter(ConsoleFormat.TIME | ConsoleFormat.LINENO)
log.info('Does hit show less') log.info('Does hit show less A')
log.debug('Current console formatter after A: %s', log.get_console_formatter())
log.update_console_formatter(ConsoleFormat.TIME | ConsoleFormat.LINENO) log.update_console_formatter(ConsoleFormat.TIME | ConsoleFormat.LINENO)
log.info('Does hit show less B') log.info('Does hit show less B')
log.debug('Current console formatter after B: %s', log.get_console_formatter())
log.update_console_formatter(ConsoleFormatSettings.ALL)
log.info('Does hit show less C')
log.debug('Current console formatter after C: %s', log.get_console_formatter())
print(f"*** Any handler is minimum level ERROR: {log.any_handler_is_minimum_level(LoggingLevel.ERROR)}") print(f"*** Any handler is minimum level ERROR: {log.any_handler_is_minimum_level(LoggingLevel.ERROR)}")
print(f"*** Any handler is minimum level DEBUG: {log.any_handler_is_minimum_level(LoggingLevel.DEBUG)}") print(f"*** Any handler is minimum level DEBUG: {log.any_handler_is_minimum_level(LoggingLevel.DEBUG)}")

View File

@@ -8,10 +8,21 @@ import re
import pytest import pytest
from corelibs.check_handling.regex_constants import ( from corelibs.check_handling.regex_constants import (
compile_re, compile_re,
SUB_EMAIL_BASIC_REGEX,
EMAIL_BASIC_REGEX, EMAIL_BASIC_REGEX,
NAME_EMAIL_SIMPLE_REGEX,
NAME_EMAIL_BASIC_REGEX,
DOMAIN_WITH_LOCALHOST_REGEX, DOMAIN_WITH_LOCALHOST_REGEX,
DOMAIN_WITH_LOCALHOST_PORT_REGEX, DOMAIN_WITH_LOCALHOST_PORT_REGEX,
DOMAIN_REGEX, DOMAIN_REGEX
)
from corelibs.check_handling.regex_constants_compiled import (
COMPILED_EMAIL_BASIC_REGEX,
COMPILED_NAME_EMAIL_SIMPLE_REGEX,
COMPILED_NAME_EMAIL_BASIC_REGEX,
COMPILED_DOMAIN_WITH_LOCALHOST_REGEX,
COMPILED_DOMAIN_WITH_LOCALHOST_PORT_REGEX,
COMPILED_DOMAIN_REGEX,
) )
@@ -48,7 +59,7 @@ class TestEmailBasicRegex:
@pytest.fixture @pytest.fixture
def email_pattern(self) -> re.Pattern[str]: def email_pattern(self) -> re.Pattern[str]:
"""Fixture that returns compiled email regex pattern.""" """Fixture that returns compiled email regex pattern."""
return compile_re(EMAIL_BASIC_REGEX) return COMPILED_EMAIL_BASIC_REGEX
@pytest.mark.parametrize("valid_email", [ @pytest.mark.parametrize("valid_email", [
"user@example.com", "user@example.com",
@@ -123,13 +134,272 @@ class TestEmailBasicRegex:
assert not email_pattern.match(email) assert not email_pattern.match(email)
class TestSubEmailBasicRegex:
"""Test cases for SUB_EMAIL_BASIC_REGEX pattern (without anchors)."""
@pytest.fixture
def sub_email_pattern(self) -> re.Pattern[str]:
"""Fixture that returns compiled sub email regex pattern."""
return compile_re(rf"^{SUB_EMAIL_BASIC_REGEX}$")
@pytest.mark.parametrize("valid_email", [
"user@example.com",
"test.user@example.com",
"user+tag@example.co.uk",
"first.last@subdomain.example.com",
"user123@test-domain.com",
"a@example.com",
"user_name@example.com",
"user-name@example.com",
"user@sub.domain.example.com",
"test!#$%&'*+-/=?^_`{|}~@example.com",
"1234567890@example.com",
])
def test_valid_emails_match(self, sub_email_pattern: re.Pattern[str], valid_email: str) -> None:
"""Test that valid email addresses match SUB_EMAIL_BASIC_REGEX."""
assert sub_email_pattern.match(valid_email), (
f"Failed to match valid email: {valid_email}"
)
@pytest.mark.parametrize("invalid_email", [
"",
"@example.com",
"user@",
"user",
"user@.com",
"user@domain",
"user @example.com",
".user@example.com",
"user@-example.com",
"user@example-.com",
"user@example.c",
"user@example.toolong",
])
def test_invalid_emails_no_match(self, sub_email_pattern: re.Pattern[str], invalid_email: str) -> None:
"""Test that invalid emails don't match SUB_EMAIL_BASIC_REGEX."""
assert not sub_email_pattern.match(invalid_email), (
f"Incorrectly matched invalid email: {invalid_email}"
)
def test_sub_email_max_local_part_length(self, sub_email_pattern: re.Pattern[str]) -> None:
"""Test email with maximum local part length (64 characters)."""
local_part = "a" * 64
email = f"{local_part}@example.com"
assert sub_email_pattern.match(email)
def test_sub_email_exceeds_local_part_length(self, sub_email_pattern: re.Pattern[str]) -> None:
"""Test email exceeding maximum local part length."""
local_part = "a" * 65
email = f"{local_part}@example.com"
assert not sub_email_pattern.match(email)
class TestNameEmailSimpleRegex:
"""Test cases for NAME_EMAIL_SIMPLE_REGEX pattern."""
@pytest.fixture
def name_email_simple_pattern(self) -> re.Pattern[str]:
"""Fixture that returns compiled name+email simple regex pattern."""
return COMPILED_NAME_EMAIL_SIMPLE_REGEX
@pytest.mark.parametrize("test_input,expected_groups", [
('"John Doe" <john@example.com>', {'name1': 'John Doe', 'email1': 'john@example.com'}),
('John Doe <john@example.com>', {'name2': 'John Doe', 'email2': 'john@example.com'}),
('<john@example.com>', {'email3': 'john@example.com'}),
('john@example.com', {'email4': 'john@example.com'}),
(' "Jane Smith" <jane@test.com> ', {'name1': 'Jane Smith', 'email1': 'jane@test.com'}),
('Bob <bob@test.org>', {'name2': 'Bob', 'email2': 'bob@test.org'}),
])
def test_valid_name_email_combinations(
self, name_email_simple_pattern: re.Pattern[str], test_input: str, expected_groups: dict[str, str]
) -> None:
"""Test that valid name+email combinations match and extract correct groups."""
match = name_email_simple_pattern.match(test_input)
assert match is not None, f"Failed to match: {test_input}"
# Check that expected groups are present and match
for group_name, expected_value in expected_groups.items():
assert match.group(group_name) == expected_value, (
f"Group {group_name} expected '{expected_value}', got '{match.group(group_name)}'"
)
@pytest.mark.parametrize("invalid_input", [
"",
"not an email",
"<>",
'"Name Only"',
'Name <',
'<email',
'Name <<email@test.com>>',
'Name <email@test.com',
'Name email@test.com>',
])
def test_invalid_name_email_combinations(
self, name_email_simple_pattern: re.Pattern[str], invalid_input: str
) -> None:
"""Test that invalid inputs don't match NAME_EMAIL_SIMPLE_REGEX."""
assert not name_email_simple_pattern.match(invalid_input), (
f"Incorrectly matched invalid input: {invalid_input}"
)
def test_extract_name_from_quoted(
self, name_email_simple_pattern: re.Pattern[str]
) -> None:
"""Test extracting name from quoted format."""
match = name_email_simple_pattern.match('"Alice Wonder" <alice@example.com>')
assert match is not None
assert match.group('name1') == 'Alice Wonder'
assert match.group('email1') == 'alice@example.com'
def test_extract_name_from_unquoted(
self, name_email_simple_pattern: re.Pattern[str]
) -> None:
"""Test extracting name from unquoted format."""
match = name_email_simple_pattern.match('Bob Builder <bob@example.com>')
assert match is not None
assert match.group('name2') == 'Bob Builder'
assert match.group('email2') == 'bob@example.com'
def test_email_only_in_brackets(
self, name_email_simple_pattern: re.Pattern[str]
) -> None:
"""Test email-only format in angle brackets."""
match = name_email_simple_pattern.match('<charlie@example.com>')
assert match is not None
assert match.group('email3') == 'charlie@example.com'
def test_email_only_plain(
self, name_email_simple_pattern: re.Pattern[str]
) -> None:
"""Test plain email format without brackets."""
match = name_email_simple_pattern.match('dave@example.com')
assert match is not None
assert match.group('email4') == 'dave@example.com'
def test_whitespace_handling(
self, name_email_simple_pattern: re.Pattern[str]
) -> None:
"""Test that leading/trailing whitespace is handled correctly."""
match = name_email_simple_pattern.match(' "User Name" <user@example.com> ')
assert match is not None
assert match.group('name1') == 'User Name'
assert match.group('email1') == 'user@example.com'
class TestNameEmailBasicRegex:
"""Test cases for NAME_EMAIL_BASIC_REGEX pattern with strict email validation."""
@pytest.fixture
def name_email_basic_pattern(self) -> re.Pattern[str]:
"""Fixture that returns compiled name+email basic regex pattern."""
return COMPILED_NAME_EMAIL_BASIC_REGEX
@pytest.mark.parametrize("test_input,expected_name,expected_email", [
('"John Doe" <john@example.com>', 'John Doe', 'john@example.com'),
('John Doe <john@example.com>', 'John Doe', 'john@example.com'),
('<john@example.com>', None, 'john@example.com'),
('john@example.com', None, 'john@example.com'),
(' "Jane Smith" <jane.smith@test.co.uk> ', 'Jane Smith', 'jane.smith@test.co.uk'),
('Alice Wonder <alice+tag@example.com>', 'Alice Wonder', 'alice+tag@example.com'),
])
def test_valid_name_email_with_validation(
self,
name_email_basic_pattern: re.Pattern[str],
test_input: str,
expected_name: str | None,
expected_email: str,
) -> None:
"""Test valid name+email with strict email validation."""
match = name_email_basic_pattern.match(test_input)
assert match is not None, f"Failed to match: {test_input}"
# Extract name and email from whichever group matched
name = match.group('name1') or match.group('name2')
email = (
match.group('email1') or match.group('email2') or
match.group('email3') or match.group('email4')
)
assert name == expected_name, f"Expected name '{expected_name}', got '{name}'"
assert email == expected_email, f"Expected email '{expected_email}', got '{email}'"
@pytest.mark.parametrize("invalid_input", [
'"John Doe" <invalid.email>', # invalid email format
'John Doe <@example.com>', # missing local part
'<user@>', # missing domain
'user@domain', # no TLD
'"Name" <user @example.com>', # space in email
'<.user@example.com>', # starts with dot
'user@-example.com', # domain starts with hyphen
'Name <user@example.c>', # TLD too short
'Name <user@example.toolongdomain>', # TLD too long
])
def test_invalid_email_format_rejected(
self, name_email_basic_pattern: re.Pattern[str], invalid_input: str
) -> None:
"""Test that inputs with invalid email formats are rejected."""
assert not name_email_basic_pattern.match(invalid_input), (
f"Incorrectly matched invalid input: {invalid_input}"
)
def test_quoted_name_with_valid_email(
self, name_email_basic_pattern: re.Pattern[str]
) -> None:
"""Test quoted name format with valid email."""
match = name_email_basic_pattern.match('"Alice Wonder" <alice@example.com>')
assert match is not None
assert match.group('name1') == 'Alice Wonder'
assert match.group('email1') == 'alice@example.com'
def test_unquoted_name_with_valid_email(
self, name_email_basic_pattern: re.Pattern[str]
) -> None:
"""Test unquoted name format with valid email."""
match = name_email_basic_pattern.match('Bob Builder <bob@example.com>')
assert match is not None
assert match.group('name2') == 'Bob Builder'
assert match.group('email2') == 'bob@example.com'
def test_email_only_formats(
self, name_email_basic_pattern: re.Pattern[str]
) -> None:
"""Test email-only formats (with and without brackets)."""
# With brackets
match1 = name_email_basic_pattern.match('<charlie@example.com>')
assert match1 is not None
assert match1.group('email3') == 'charlie@example.com'
# Without brackets
match2 = name_email_basic_pattern.match('dave@example.com')
assert match2 is not None
assert match2.group('email4') == 'dave@example.com'
def test_whitespace_handling(
self, name_email_basic_pattern: re.Pattern[str]
) -> None:
"""Test that leading/trailing whitespace is handled correctly."""
match = name_email_basic_pattern.match(' "User" <user@example.com> ')
assert match is not None
assert match.group('name1') == 'User'
assert match.group('email1') == 'user@example.com'
def test_special_characters_in_local_part(
self, name_email_basic_pattern: re.Pattern[str]
) -> None:
"""Test email with special characters in local part."""
match = name_email_basic_pattern.match('Test User <test!#$%&\'*+-/=?^_`{|}~@example.com>')
assert match is not None
assert match.group('name2') == 'Test User'
assert match.group('email2') == 'test!#$%&\'*+-/=?^_`{|}~@example.com'
class TestDomainWithLocalhostRegex: class TestDomainWithLocalhostRegex:
"""Test cases for DOMAIN_WITH_LOCALHOST_REGEX pattern.""" """Test cases for DOMAIN_WITH_LOCALHOST_REGEX pattern."""
@pytest.fixture @pytest.fixture
def domain_localhost_pattern(self) -> re.Pattern[str]: def domain_localhost_pattern(self) -> re.Pattern[str]:
"""Fixture that returns compiled domain with localhost regex pattern.""" """Fixture that returns compiled domain with localhost regex pattern."""
return compile_re(DOMAIN_WITH_LOCALHOST_REGEX) return COMPILED_DOMAIN_WITH_LOCALHOST_REGEX
@pytest.mark.parametrize("valid_domain", [ @pytest.mark.parametrize("valid_domain", [
"localhost", "localhost",
@@ -181,7 +451,7 @@ class TestDomainWithLocalhostPortRegex:
@pytest.fixture @pytest.fixture
def domain_localhost_port_pattern(self) -> re.Pattern[str]: def domain_localhost_port_pattern(self) -> re.Pattern[str]:
"""Fixture that returns compiled domain and localhost with port pattern.""" """Fixture that returns compiled domain and localhost with port pattern."""
return compile_re(DOMAIN_WITH_LOCALHOST_PORT_REGEX) return COMPILED_DOMAIN_WITH_LOCALHOST_PORT_REGEX
@pytest.mark.parametrize("valid_domain", [ @pytest.mark.parametrize("valid_domain", [
"localhost", "localhost",
@@ -247,7 +517,7 @@ class TestDomainRegex:
@pytest.fixture @pytest.fixture
def domain_pattern(self) -> re.Pattern[str]: def domain_pattern(self) -> re.Pattern[str]:
"""Fixture that returns compiled domain regex pattern.""" """Fixture that returns compiled domain regex pattern."""
return compile_re(DOMAIN_REGEX) return COMPILED_DOMAIN_REGEX
@pytest.mark.parametrize("valid_domain", [ @pytest.mark.parametrize("valid_domain", [
"example.com", "example.com",
@@ -306,6 +576,8 @@ class TestRegexPatternConsistency:
"""Test that all regex patterns can be compiled without errors.""" """Test that all regex patterns can be compiled without errors."""
patterns = [ patterns = [
EMAIL_BASIC_REGEX, EMAIL_BASIC_REGEX,
NAME_EMAIL_SIMPLE_REGEX,
NAME_EMAIL_BASIC_REGEX,
DOMAIN_WITH_LOCALHOST_REGEX, DOMAIN_WITH_LOCALHOST_REGEX,
DOMAIN_WITH_LOCALHOST_PORT_REGEX, DOMAIN_WITH_LOCALHOST_PORT_REGEX,
DOMAIN_REGEX, DOMAIN_REGEX,
@@ -314,9 +586,24 @@ class TestRegexPatternConsistency:
compiled = compile_re(pattern) compiled = compile_re(pattern)
assert isinstance(compiled, re.Pattern) assert isinstance(compiled, re.Pattern)
def test_compiled_patterns_are_patterns(self) -> None:
"""Test that all COMPILED_ constants are Pattern objects."""
compiled_patterns = [
COMPILED_EMAIL_BASIC_REGEX,
COMPILED_NAME_EMAIL_SIMPLE_REGEX,
COMPILED_NAME_EMAIL_BASIC_REGEX,
COMPILED_DOMAIN_WITH_LOCALHOST_REGEX,
COMPILED_DOMAIN_WITH_LOCALHOST_PORT_REGEX,
COMPILED_DOMAIN_REGEX,
]
for pattern in compiled_patterns:
assert isinstance(pattern, re.Pattern)
def test_domain_patterns_are_strings(self) -> None: def test_domain_patterns_are_strings(self) -> None:
"""Test that all regex constants are strings.""" """Test that all regex constants are strings."""
assert isinstance(EMAIL_BASIC_REGEX, str) assert isinstance(EMAIL_BASIC_REGEX, str)
assert isinstance(NAME_EMAIL_SIMPLE_REGEX, str)
assert isinstance(NAME_EMAIL_BASIC_REGEX, str)
assert isinstance(DOMAIN_WITH_LOCALHOST_REGEX, str) assert isinstance(DOMAIN_WITH_LOCALHOST_REGEX, str)
assert isinstance(DOMAIN_WITH_LOCALHOST_PORT_REGEX, str) assert isinstance(DOMAIN_WITH_LOCALHOST_PORT_REGEX, str)
assert isinstance(DOMAIN_REGEX, str) assert isinstance(DOMAIN_REGEX, str)
@@ -325,8 +612,8 @@ class TestRegexPatternConsistency:
"""Test that domain patterns follow expected hierarchy.""" """Test that domain patterns follow expected hierarchy."""
# DOMAIN_WITH_LOCALHOST_PORT_REGEX should accept everything # DOMAIN_WITH_LOCALHOST_PORT_REGEX should accept everything
# DOMAIN_WITH_LOCALHOST_REGEX accepts # DOMAIN_WITH_LOCALHOST_REGEX accepts
domain_localhost = compile_re(DOMAIN_WITH_LOCALHOST_REGEX) domain_localhost = COMPILED_DOMAIN_WITH_LOCALHOST_REGEX
domain_localhost_port = compile_re(DOMAIN_WITH_LOCALHOST_PORT_REGEX) domain_localhost_port = COMPILED_DOMAIN_WITH_LOCALHOST_PORT_REGEX
test_cases = ["example.com", "subdomain.example.com", "localhost"] test_cases = ["example.com", "subdomain.example.com", "localhost"]
for test_case in test_cases: for test_case in test_cases:

View File

@@ -16,7 +16,7 @@ class TestSettingsLoaderInit:
def test_init_with_valid_config_file(self, tmp_path: Path): def test_init_with_valid_config_file(self, tmp_path: Path):
"""Test initialization with a valid config file""" """Test initialization with a valid config file"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[Section]\nkey=value\n") config_file.write_text("[Section]\nkey=value\n")
loader = SettingsLoader( loader = SettingsLoader(
@@ -35,7 +35,7 @@ class TestSettingsLoaderInit:
def test_init_with_missing_config_file(self, tmp_path: Path): def test_init_with_missing_config_file(self, tmp_path: Path):
"""Test initialization with missing config file""" """Test initialization with missing config file"""
config_file = tmp_path / "missing.ini" config_file = tmp_path.joinpath("missing.ini")
loader = SettingsLoader( loader = SettingsLoader(
args={}, args={},
@@ -60,7 +60,7 @@ class TestSettingsLoaderInit:
def test_init_with_log(self, tmp_path: Path): def test_init_with_log(self, tmp_path: Path):
"""Test initialization with Log object""" """Test initialization with Log object"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[Section]\nkey=value\n") config_file.write_text("[Section]\nkey=value\n")
mock_log = Mock(spec=Log) mock_log = Mock(spec=Log)
@@ -80,7 +80,7 @@ class TestLoadSettings:
def test_load_settings_basic(self, tmp_path: Path): def test_load_settings_basic(self, tmp_path: Path):
"""Test loading basic settings without validation""" """Test loading basic settings without validation"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nkey1=value1\nkey2=value2\n") config_file.write_text("[TestSection]\nkey1=value1\nkey2=value2\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -90,7 +90,7 @@ class TestLoadSettings:
def test_load_settings_with_missing_section(self, tmp_path: Path): def test_load_settings_with_missing_section(self, tmp_path: Path):
"""Test loading settings with missing section""" """Test loading settings with missing section"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[OtherSection]\nkey=value\n") config_file.write_text("[OtherSection]\nkey=value\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -100,7 +100,7 @@ class TestLoadSettings:
def test_load_settings_allow_not_exist(self, tmp_path: Path): def test_load_settings_allow_not_exist(self, tmp_path: Path):
"""Test loading settings with allow_not_exist flag""" """Test loading settings with allow_not_exist flag"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[OtherSection]\nkey=value\n") config_file.write_text("[OtherSection]\nkey=value\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -110,7 +110,7 @@ class TestLoadSettings:
def test_load_settings_mandatory_field_present(self, tmp_path: Path): def test_load_settings_mandatory_field_present(self, tmp_path: Path):
"""Test mandatory field validation when field is present""" """Test mandatory field validation when field is present"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nrequired_field=value\n") config_file.write_text("[TestSection]\nrequired_field=value\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -123,7 +123,7 @@ class TestLoadSettings:
def test_load_settings_mandatory_field_missing(self, tmp_path: Path): def test_load_settings_mandatory_field_missing(self, tmp_path: Path):
"""Test mandatory field validation when field is missing""" """Test mandatory field validation when field is missing"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nother_field=value\n") config_file.write_text("[TestSection]\nother_field=value\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -136,7 +136,7 @@ class TestLoadSettings:
def test_load_settings_mandatory_field_empty(self, tmp_path: Path): def test_load_settings_mandatory_field_empty(self, tmp_path: Path):
"""Test mandatory field validation when field is empty""" """Test mandatory field validation when field is empty"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nrequired_field=\n") config_file.write_text("[TestSection]\nrequired_field=\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -149,7 +149,7 @@ class TestLoadSettings:
def test_load_settings_with_split(self, tmp_path: Path): def test_load_settings_with_split(self, tmp_path: Path):
"""Test splitting values into lists""" """Test splitting values into lists"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nlist_field=a,b,c,d\n") config_file.write_text("[TestSection]\nlist_field=a,b,c,d\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -162,7 +162,7 @@ class TestLoadSettings:
def test_load_settings_with_custom_split_char(self, tmp_path: Path): def test_load_settings_with_custom_split_char(self, tmp_path: Path):
"""Test splitting with custom delimiter""" """Test splitting with custom delimiter"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nlist_field=a|b|c|d\n") config_file.write_text("[TestSection]\nlist_field=a|b|c|d\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -175,7 +175,7 @@ class TestLoadSettings:
def test_load_settings_split_removes_spaces(self, tmp_path: Path): def test_load_settings_split_removes_spaces(self, tmp_path: Path):
"""Test that split removes spaces from values""" """Test that split removes spaces from values"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nlist_field=a, b , c , d\n") config_file.write_text("[TestSection]\nlist_field=a, b , c , d\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -188,7 +188,7 @@ class TestLoadSettings:
def test_load_settings_empty_split_char_fallback(self, tmp_path: Path, capsys: CaptureFixture[str]): def test_load_settings_empty_split_char_fallback(self, tmp_path: Path, capsys: CaptureFixture[str]):
"""Test fallback to default split char when empty""" """Test fallback to default split char when empty"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nlist_field=a,b,c\n") config_file.write_text("[TestSection]\nlist_field=a,b,c\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -201,9 +201,22 @@ class TestLoadSettings:
captured = capsys.readouterr() captured = capsys.readouterr()
assert "fallback to:" in captured.out assert "fallback to:" in captured.out
def test_load_settings_split_empty_value(self, tmp_path: Path):
"""Test that split on empty value results in empty list"""
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nlist_field=\n")
loader = SettingsLoader(args={}, config_file=config_file)
result = loader.load_settings(
"TestSection",
{"list_field": ["split:,"]}
)
assert result["list_field"] == []
def test_load_settings_convert_to_int(self, tmp_path: Path): def test_load_settings_convert_to_int(self, tmp_path: Path):
"""Test converting values to int""" """Test converting values to int"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nnumber=123\n") config_file.write_text("[TestSection]\nnumber=123\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -217,7 +230,7 @@ class TestLoadSettings:
def test_load_settings_convert_to_float(self, tmp_path: Path): def test_load_settings_convert_to_float(self, tmp_path: Path):
"""Test converting values to float""" """Test converting values to float"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nnumber=123.45\n") config_file.write_text("[TestSection]\nnumber=123.45\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -231,7 +244,7 @@ class TestLoadSettings:
def test_load_settings_convert_to_bool_true(self, tmp_path: Path): def test_load_settings_convert_to_bool_true(self, tmp_path: Path):
"""Test converting values to boolean True""" """Test converting values to boolean True"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nflag1=true\nflag2=True\n") config_file.write_text("[TestSection]\nflag1=true\nflag2=True\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -245,7 +258,7 @@ class TestLoadSettings:
def test_load_settings_convert_to_bool_false(self, tmp_path: Path): def test_load_settings_convert_to_bool_false(self, tmp_path: Path):
"""Test converting values to boolean False""" """Test converting values to boolean False"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nflag1=false\nflag2=False\n") config_file.write_text("[TestSection]\nflag1=false\nflag2=False\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -259,7 +272,7 @@ class TestLoadSettings:
def test_load_settings_convert_invalid_type(self, tmp_path: Path): def test_load_settings_convert_invalid_type(self, tmp_path: Path):
"""Test converting with invalid type raises error""" """Test converting with invalid type raises error"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=test\n") config_file.write_text("[TestSection]\nvalue=test\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -272,7 +285,7 @@ class TestLoadSettings:
def test_load_settings_empty_set_to_none(self, tmp_path: Path): def test_load_settings_empty_set_to_none(self, tmp_path: Path):
"""Test setting empty values to None""" """Test setting empty values to None"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nother=value\n") config_file.write_text("[TestSection]\nother=value\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -285,7 +298,7 @@ class TestLoadSettings:
def test_load_settings_empty_set_to_custom_value(self, tmp_path: Path): def test_load_settings_empty_set_to_custom_value(self, tmp_path: Path):
"""Test setting empty values to custom value""" """Test setting empty values to custom value"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nother=value\n") config_file.write_text("[TestSection]\nother=value\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -298,7 +311,7 @@ class TestLoadSettings:
def test_load_settings_matching_valid(self, tmp_path: Path): def test_load_settings_matching_valid(self, tmp_path: Path):
"""Test matching validation with valid value""" """Test matching validation with valid value"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nmode=production\n") config_file.write_text("[TestSection]\nmode=production\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -311,7 +324,7 @@ class TestLoadSettings:
def test_load_settings_matching_invalid(self, tmp_path: Path): def test_load_settings_matching_invalid(self, tmp_path: Path):
"""Test matching validation with invalid value""" """Test matching validation with invalid value"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nmode=invalid\n") config_file.write_text("[TestSection]\nmode=invalid\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -324,7 +337,7 @@ class TestLoadSettings:
def test_load_settings_in_valid(self, tmp_path: Path): def test_load_settings_in_valid(self, tmp_path: Path):
"""Test 'in' validation with valid value""" """Test 'in' validation with valid value"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nallowed=a,b,c\nvalue=b\n") config_file.write_text("[TestSection]\nallowed=a,b,c\nvalue=b\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -340,7 +353,7 @@ class TestLoadSettings:
def test_load_settings_in_invalid(self, tmp_path: Path): def test_load_settings_in_invalid(self, tmp_path: Path):
"""Test 'in' validation with invalid value""" """Test 'in' validation with invalid value"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nallowed=a,b,c\nvalue=d\n") config_file.write_text("[TestSection]\nallowed=a,b,c\nvalue=d\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -356,7 +369,7 @@ class TestLoadSettings:
def test_load_settings_in_missing_target(self, tmp_path: Path): def test_load_settings_in_missing_target(self, tmp_path: Path):
"""Test 'in' validation with missing target""" """Test 'in' validation with missing target"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=a\n") config_file.write_text("[TestSection]\nvalue=a\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -369,7 +382,7 @@ class TestLoadSettings:
def test_load_settings_length_exact(self, tmp_path: Path): def test_load_settings_length_exact(self, tmp_path: Path):
"""Test length validation with exact match""" """Test length validation with exact match"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=test\n") config_file.write_text("[TestSection]\nvalue=test\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -382,7 +395,7 @@ class TestLoadSettings:
def test_load_settings_length_exact_invalid(self, tmp_path: Path): def test_load_settings_length_exact_invalid(self, tmp_path: Path):
"""Test length validation with exact match failure""" """Test length validation with exact match failure"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=test\n") config_file.write_text("[TestSection]\nvalue=test\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -395,7 +408,7 @@ class TestLoadSettings:
def test_load_settings_length_range(self, tmp_path: Path): def test_load_settings_length_range(self, tmp_path: Path):
"""Test length validation with range""" """Test length validation with range"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=testing\n") config_file.write_text("[TestSection]\nvalue=testing\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -408,7 +421,7 @@ class TestLoadSettings:
def test_load_settings_length_min_only(self, tmp_path: Path): def test_load_settings_length_min_only(self, tmp_path: Path):
"""Test length validation with minimum only""" """Test length validation with minimum only"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=testing\n") config_file.write_text("[TestSection]\nvalue=testing\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -421,7 +434,7 @@ class TestLoadSettings:
def test_load_settings_length_max_only(self, tmp_path: Path): def test_load_settings_length_max_only(self, tmp_path: Path):
"""Test length validation with maximum only""" """Test length validation with maximum only"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=test\n") config_file.write_text("[TestSection]\nvalue=test\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -434,7 +447,7 @@ class TestLoadSettings:
def test_load_settings_range_valid(self, tmp_path: Path): def test_load_settings_range_valid(self, tmp_path: Path):
"""Test range validation with valid value""" """Test range validation with valid value"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nnumber=25\n") config_file.write_text("[TestSection]\nnumber=25\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -447,7 +460,7 @@ class TestLoadSettings:
def test_load_settings_range_invalid(self, tmp_path: Path): def test_load_settings_range_invalid(self, tmp_path: Path):
"""Test range validation with invalid value""" """Test range validation with invalid value"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nnumber=100\n") config_file.write_text("[TestSection]\nnumber=100\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -460,7 +473,7 @@ class TestLoadSettings:
def test_load_settings_check_int_valid(self, tmp_path: Path): def test_load_settings_check_int_valid(self, tmp_path: Path):
"""Test check:int with valid integer""" """Test check:int with valid integer"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nnumber=12345\n") config_file.write_text("[TestSection]\nnumber=12345\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -473,7 +486,7 @@ class TestLoadSettings:
def test_load_settings_check_int_cleanup(self, tmp_path: Path): def test_load_settings_check_int_cleanup(self, tmp_path: Path):
"""Test check:int with cleanup""" """Test check:int with cleanup"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nnumber=12a34b5\n") config_file.write_text("[TestSection]\nnumber=12a34b5\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -486,7 +499,7 @@ class TestLoadSettings:
def test_load_settings_check_email_valid(self, tmp_path: Path): def test_load_settings_check_email_valid(self, tmp_path: Path):
"""Test check:string.email.basic with valid email""" """Test check:string.email.basic with valid email"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nemail=test@example.com\n") config_file.write_text("[TestSection]\nemail=test@example.com\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -499,7 +512,7 @@ class TestLoadSettings:
def test_load_settings_check_email_invalid(self, tmp_path: Path): def test_load_settings_check_email_invalid(self, tmp_path: Path):
"""Test check:string.email.basic with invalid email""" """Test check:string.email.basic with invalid email"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nemail=not-an-email\n") config_file.write_text("[TestSection]\nemail=not-an-email\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -512,7 +525,7 @@ class TestLoadSettings:
def test_load_settings_args_override(self, tmp_path: Path, capsys: CaptureFixture[str]): def test_load_settings_args_override(self, tmp_path: Path, capsys: CaptureFixture[str]):
"""Test command line arguments override config values""" """Test command line arguments override config values"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=config_value\n") config_file.write_text("[TestSection]\nvalue=config_value\n")
loader = SettingsLoader( loader = SettingsLoader(
@@ -530,7 +543,7 @@ class TestLoadSettings:
def test_load_settings_args_no_flag(self, tmp_path: Path, capsys: CaptureFixture[str]): def test_load_settings_args_no_flag(self, tmp_path: Path, capsys: CaptureFixture[str]):
"""Test default behavior (no args_override:yes) with list argument that has split""" """Test default behavior (no args_override:yes) with list argument that has split"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=a,b,c\n") config_file.write_text("[TestSection]\nvalue=a,b,c\n")
loader = SettingsLoader( loader = SettingsLoader(
@@ -550,7 +563,7 @@ class TestLoadSettings:
def test_load_settings_args_list_no_split(self, tmp_path: Path, capsys: CaptureFixture[str]): def test_load_settings_args_list_no_split(self, tmp_path: Path, capsys: CaptureFixture[str]):
"""Test that list arguments without split entry are skipped""" """Test that list arguments without split entry are skipped"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=config_value\n") config_file.write_text("[TestSection]\nvalue=config_value\n")
loader = SettingsLoader( loader = SettingsLoader(
@@ -570,7 +583,7 @@ class TestLoadSettings:
def test_load_settings_args_list_with_split(self, tmp_path: Path, capsys: CaptureFixture[str]): def test_load_settings_args_list_with_split(self, tmp_path: Path, capsys: CaptureFixture[str]):
"""Test that list arguments with split entry and args_override:yes are applied""" """Test that list arguments with split entry and args_override:yes are applied"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=a,b,c\n") config_file.write_text("[TestSection]\nvalue=a,b,c\n")
loader = SettingsLoader( loader = SettingsLoader(
@@ -589,7 +602,7 @@ class TestLoadSettings:
def test_load_settings_args_no_with_mandatory(self, tmp_path: Path, capsys: CaptureFixture[str]): def test_load_settings_args_no_with_mandatory(self, tmp_path: Path, capsys: CaptureFixture[str]):
"""Test default behavior (no args_override:yes) with mandatory field and list args with split""" """Test default behavior (no args_override:yes) with mandatory field and list args with split"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=config1,config2\n") config_file.write_text("[TestSection]\nvalue=config1,config2\n")
loader = SettingsLoader( loader = SettingsLoader(
@@ -609,7 +622,7 @@ class TestLoadSettings:
def test_load_settings_args_no_with_mandatory_valid(self, tmp_path: Path, capsys: CaptureFixture[str]): def test_load_settings_args_no_with_mandatory_valid(self, tmp_path: Path, capsys: CaptureFixture[str]):
"""Test default behavior with string args (always overrides due to current logic)""" """Test default behavior with string args (always overrides due to current logic)"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=config_value\n") config_file.write_text("[TestSection]\nvalue=config_value\n")
loader = SettingsLoader( loader = SettingsLoader(
@@ -628,7 +641,7 @@ class TestLoadSettings:
def test_load_settings_args_string_no_split(self, tmp_path: Path, capsys: CaptureFixture[str]): def test_load_settings_args_string_no_split(self, tmp_path: Path, capsys: CaptureFixture[str]):
"""Test that string arguments with args_override:yes work normally""" """Test that string arguments with args_override:yes work normally"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=config_value\n") config_file.write_text("[TestSection]\nvalue=config_value\n")
loader = SettingsLoader( loader = SettingsLoader(
@@ -647,7 +660,7 @@ class TestLoadSettings:
def test_load_settings_no_config_file_with_args(self, tmp_path: Path): def test_load_settings_no_config_file_with_args(self, tmp_path: Path):
"""Test loading settings without config file but with mandatory args""" """Test loading settings without config file but with mandatory args"""
config_file = tmp_path / "missing.ini" config_file = tmp_path.joinpath("missing.ini")
loader = SettingsLoader( loader = SettingsLoader(
args={"required": "value"}, args={"required": "value"},
@@ -662,7 +675,7 @@ class TestLoadSettings:
def test_load_settings_no_config_file_missing_args(self, tmp_path: Path): def test_load_settings_no_config_file_missing_args(self, tmp_path: Path):
"""Test loading settings without config file and missing args""" """Test loading settings without config file and missing args"""
config_file = tmp_path / "missing.ini" config_file = tmp_path.joinpath("missing.ini")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -674,7 +687,7 @@ class TestLoadSettings:
def test_load_settings_check_list_with_split(self, tmp_path: Path): def test_load_settings_check_list_with_split(self, tmp_path: Path):
"""Test check validation with list values""" """Test check validation with list values"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nlist=abc,def,ghi\n") config_file.write_text("[TestSection]\nlist=abc,def,ghi\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -687,7 +700,7 @@ class TestLoadSettings:
def test_load_settings_check_list_cleanup(self, tmp_path: Path): def test_load_settings_check_list_cleanup(self, tmp_path: Path):
"""Test check validation cleans up list values""" """Test check validation cleans up list values"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nlist=ab-c,de_f,gh!i\n") config_file.write_text("[TestSection]\nlist=ab-c,de_f,gh!i\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -700,7 +713,7 @@ class TestLoadSettings:
def test_load_settings_invalid_check_type(self, tmp_path: Path): def test_load_settings_invalid_check_type(self, tmp_path: Path):
"""Test with invalid check type""" """Test with invalid check type"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=test\n") config_file.write_text("[TestSection]\nvalue=test\n")
loader = SettingsLoader(args={}, config_file=config_file) loader = SettingsLoader(args={}, config_file=config_file)
@@ -717,7 +730,7 @@ class TestComplexScenarios:
def test_complex_validation_scenario(self, tmp_path: Path): def test_complex_validation_scenario(self, tmp_path: Path):
"""Test complex scenario with multiple validations""" """Test complex scenario with multiple validations"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text( config_file.write_text(
"[Production]\n" "[Production]\n"
"environment=production\n" "environment=production\n"
@@ -758,7 +771,7 @@ class TestComplexScenarios:
def test_email_list_validation(self, tmp_path: Path): def test_email_list_validation(self, tmp_path: Path):
"""Test email list with validation""" """Test email list with validation"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text( config_file.write_text(
"[EmailConfig]\n" "[EmailConfig]\n"
"emails=test@example.com,admin@domain.org,user+tag@site.co.uk\n" "emails=test@example.com,admin@domain.org,user+tag@site.co.uk\n"
@@ -775,7 +788,7 @@ class TestComplexScenarios:
def test_mixed_args_and_config(self, tmp_path: Path): def test_mixed_args_and_config(self, tmp_path: Path):
"""Test mixing command line args and config file""" """Test mixing command line args and config file"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text( config_file.write_text(
"[Settings]\n" "[Settings]\n"
"value1=config_value1\n" "value1=config_value1\n"
@@ -796,7 +809,7 @@ class TestComplexScenarios:
def test_multiple_check_types(self, tmp_path: Path): def test_multiple_check_types(self, tmp_path: Path):
"""Test multiple different check types""" """Test multiple different check types"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text( config_file.write_text(
"[Checks]\n" "[Checks]\n"
"numbers=123,456,789\n" "numbers=123,456,789\n"
@@ -823,7 +836,7 @@ class TestComplexScenarios:
def test_args_no_and_list_skip_combination(self, tmp_path: Path, capsys: CaptureFixture[str]): def test_args_no_and_list_skip_combination(self, tmp_path: Path, capsys: CaptureFixture[str]):
"""Test combination of args_override:yes flag and list argument skip behavior""" """Test combination of args_override:yes flag and list argument skip behavior"""
config_file = tmp_path / "test.ini" config_file = tmp_path.joinpath("test.ini")
config_file.write_text( config_file.write_text(
"[Settings]\n" "[Settings]\n"
"no_override=a,b,c\n" "no_override=a,b,c\n"

View File

@@ -4,7 +4,101 @@ tests for corelibs.iterator_handling.fingerprint
from typing import Any from typing import Any
import pytest import pytest
from corelibs.iterator_handling.fingerprint import dict_hash_frozen, dict_hash_crc from corelibs.iterator_handling.fingerprint import dict_hash_frozen, dict_hash_crc, hash_object
class TestHashObject:
"""Tests for hash_object function"""
def test_hash_object_simple_dict(self):
"""Test hashing a simple dictionary with hash_object"""
data = {"key1": "value1", "key2": "value2"}
result = hash_object(data)
assert isinstance(result, str)
assert len(result) == 64 # SHA256 produces 64 hex characters
def test_hash_object_mixed_keys(self):
"""Test hash_object with mixed int and string keys"""
data = {"key1": "value1", 1: "value2", 2: "value3"}
result = hash_object(data)
assert isinstance(result, str)
assert len(result) == 64
def test_hash_object_consistency(self):
"""Test that hash_object produces consistent results"""
data = {"str_key": "value", 123: "number_key"}
hash1 = hash_object(data)
hash2 = hash_object(data)
assert hash1 == hash2
def test_hash_object_order_independence(self):
"""Test that hash_object is order-independent"""
data1 = {"a": 1, 1: "one", "b": 2, 2: "two"}
data2 = {2: "two", "b": 2, 1: "one", "a": 1}
hash1 = hash_object(data1)
hash2 = hash_object(data2)
assert hash1 == hash2
def test_hash_object_list_of_dicts_mixed_keys(self):
"""Test hash_object with list of dicts containing mixed keys"""
data = [
{"name": "item1", 1: "value1"},
{"name": "item2", 2: "value2"}
]
result = hash_object(data)
assert isinstance(result, str)
assert len(result) == 64
def test_hash_object_nested_mixed_keys(self):
"""Test hash_object with nested structures containing mixed keys"""
data = {
"outer": {
"inner": "value",
1: "mixed_key"
},
2: "another_mixed"
}
result = hash_object(data)
assert isinstance(result, str)
assert len(result) == 64
def test_hash_object_different_data(self):
"""Test that different data produces different hashes"""
data1 = {"key": "value", 1: "one"}
data2 = {"key": "value", 2: "two"}
hash1 = hash_object(data1)
hash2 = hash_object(data2)
assert hash1 != hash2
def test_hash_object_complex_nested(self):
"""Test hash_object with complex nested structures"""
data = {
"level1": {
"level2": {
1: "value",
"key": [1, 2, {"nested": "deep", 3: "int_key"}]
}
}
}
result = hash_object(data)
assert isinstance(result, str)
assert len(result) == 64
def test_hash_object_list_with_tuples(self):
"""Test hash_object with lists containing tuples"""
data = [("a", 1), ("b", 2), {1: "mixed", "key": "value"}]
result = hash_object(data)
assert isinstance(result, str)
assert len(result) == 64
class TestDictHashFrozen: class TestDictHashFrozen:
@@ -279,6 +373,116 @@ class TestDictHashCrc:
assert isinstance(result, str) assert isinstance(result, str)
assert len(result) == 64 assert len(result) == 64
def test_dict_hash_crc_fallback_mixed_keys(self):
"""Test dict_hash_crc fallback with mixed int and string keys"""
data = {"key1": "value1", 1: "value2", 2: "value3"}
result = dict_hash_crc(data)
assert isinstance(result, str)
# Fallback prefixes with "HO_"
assert result.startswith("HO_")
# Hash should be 64 chars + 3 char prefix = 67 total
assert len(result) == 67
def test_dict_hash_crc_fallback_consistency(self):
"""Test that fallback produces consistent hashes"""
data = {"str_key": "value", 123: "number_key", 456: "another"}
hash1 = dict_hash_crc(data)
hash2 = dict_hash_crc(data)
assert hash1 == hash2
assert hash1.startswith("HO_")
def test_dict_hash_crc_fallback_order_independence(self):
"""Test that fallback is order-independent for mixed-key dicts"""
data1 = {"a": 1, 1: "one", "b": 2, 2: "two"}
data2 = {2: "two", "b": 2, 1: "one", "a": 1}
hash1 = dict_hash_crc(data1)
hash2 = dict_hash_crc(data2)
assert hash1 == hash2
assert hash1.startswith("HO_")
def test_dict_hash_crc_fallback_list_of_dicts_mixed_keys(self):
"""Test fallback with list of dicts containing mixed keys"""
data = [
{"name": "item1", 1: "value1"},
{"name": "item2", 2: "value2"},
{3: "value3", "type": "mixed"}
]
result = dict_hash_crc(data)
assert isinstance(result, str)
assert result.startswith("HO_")
assert len(result) == 67
def test_dict_hash_crc_fallback_nested_mixed_keys(self):
"""Test fallback with nested dicts containing mixed keys"""
data = {
"outer": {
"inner": "value",
1: "mixed_key"
},
2: "another_mixed"
}
result = dict_hash_crc(data)
assert isinstance(result, str)
assert result.startswith("HO_")
assert len(result) == 67
def test_dict_hash_crc_fallback_different_data(self):
"""Test that different mixed-key data produces different hashes"""
data1 = {"key": "value", 1: "one"}
data2 = {"key": "value", 2: "two"}
hash1 = dict_hash_crc(data1)
hash2 = dict_hash_crc(data2)
assert hash1 != hash2
assert hash1.startswith("HO_")
assert hash2.startswith("HO_")
def test_dict_hash_crc_fallback_complex_structure(self):
"""Test fallback with complex nested structure with mixed keys"""
data = [
{
"id": 1,
1: "first",
"data": {
"nested": "value",
100: "nested_int_key"
}
},
{
"id": 2,
2: "second",
"items": [1, 2, 3]
}
]
result = dict_hash_crc(data)
assert isinstance(result, str)
assert result.startswith("HO_")
assert len(result) == 67
def test_dict_hash_crc_no_fallback_string_keys_only(self):
"""Test that string-only keys don't trigger fallback"""
data = {"key1": "value1", "key2": "value2", "key3": "value3"}
result = dict_hash_crc(data)
assert isinstance(result, str)
assert not result.startswith("HO_")
assert len(result) == 64
def test_dict_hash_crc_no_fallback_int_keys_only(self):
"""Test that int-only keys don't trigger fallback"""
data = {1: "one", 2: "two", 3: "three"}
result = dict_hash_crc(data)
assert isinstance(result, str)
assert not result.startswith("HO_")
assert len(result) == 64
class TestComparisonBetweenHashFunctions: class TestComparisonBetweenHashFunctions:
"""Tests comparing dict_hash_frozen and dict_hash_crc""" """Tests comparing dict_hash_frozen and dict_hash_crc"""

View File

@@ -4,7 +4,7 @@ iterator_handling.list_helepr tests
from typing import Any from typing import Any
import pytest import pytest
from corelibs.iterator_handling.list_helpers import convert_to_list, is_list_in_list from corelibs.iterator_handling.list_helpers import convert_to_list, is_list_in_list, make_unique_list_of_dicts
class TestConvertToList: class TestConvertToList:
@@ -298,3 +298,225 @@ class TestPerformance:
# Should still work correctly despite duplicates # Should still work correctly despite duplicates
assert set(result) == {1, 3} assert set(result) == {1, 3}
assert isinstance(result, list) assert isinstance(result, list)
class TestMakeUniqueListOfDicts:
"""Test cases for make_unique_list_of_dicts function"""
def test_basic_duplicate_removal(self):
"""Test basic removal of duplicate dictionaries"""
dict_list = [
{"a": 1, "b": 2},
{"a": 1, "b": 2},
{"a": 3, "b": 4}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 2
assert {"a": 1, "b": 2} in result
assert {"a": 3, "b": 4} in result
def test_order_independent_duplicates(self):
"""Test that dictionaries with different key orders are treated as duplicates"""
dict_list = [
{"a": 1, "b": 2},
{"b": 2, "a": 1}, # Same content, different order
{"a": 3, "b": 4}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 2
assert {"a": 1, "b": 2} in result
assert {"a": 3, "b": 4} in result
def test_empty_list(self):
"""Test with empty list"""
result = make_unique_list_of_dicts([])
assert result == []
assert isinstance(result, list)
def test_single_dict(self):
"""Test with single dictionary"""
dict_list = [{"a": 1, "b": 2}]
result = make_unique_list_of_dicts(dict_list)
assert result == [{"a": 1, "b": 2}]
def test_all_unique(self):
"""Test when all dictionaries are unique"""
dict_list = [
{"a": 1},
{"b": 2},
{"c": 3},
{"d": 4}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 4
for d in dict_list:
assert d in result
def test_all_duplicates(self):
"""Test when all dictionaries are duplicates"""
dict_list = [
{"a": 1, "b": 2},
{"a": 1, "b": 2},
{"a": 1, "b": 2},
{"b": 2, "a": 1}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 1
assert result[0] == {"a": 1, "b": 2}
def test_nested_values(self):
"""Test with nested structures as values"""
dict_list = [
{"a": [1, 2], "b": 3},
{"a": [1, 2], "b": 3},
{"a": [1, 3], "b": 3}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 2
assert {"a": [1, 2], "b": 3} in result
assert {"a": [1, 3], "b": 3} in result
def test_different_value_types(self):
"""Test with different value types"""
dict_list = [
{"str": "hello", "int": 42, "float": 3.14, "bool": True},
{"str": "hello", "int": 42, "float": 3.14, "bool": True},
{"str": "world", "int": 99, "float": 2.71, "bool": False}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 2
def test_empty_dicts(self):
"""Test with empty dictionaries"""
dict_list: list[Any] = [
{},
{},
{"a": 1}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 2
assert {} in result
assert {"a": 1} in result
def test_single_key_dicts(self):
"""Test with single key dictionaries"""
dict_list = [
{"a": 1},
{"a": 1},
{"a": 2},
{"b": 1}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 3
assert {"a": 1} in result
assert {"a": 2} in result
assert {"b": 1} in result
def test_many_keys(self):
"""Test with dictionaries containing many keys"""
dict1 = {"a": 1, "b": 2, "c": 3, "d": 4, "e": 5}
dict2 = {"e": 5, "d": 4, "c": 3, "b": 2, "a": 1} # Same, different order
dict3 = {"a": 1, "b": 2, "c": 3, "d": 4, "e": 6} # Different value
dict_list = [dict1, dict2, dict3]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 2
def test_numeric_keys(self):
"""Test with numeric keys"""
dict_list = [
{1: "one", 2: "two"},
{2: "two", 1: "one"},
{1: "one", 2: "three"}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 2
def test_none_values(self):
"""Test with None values"""
dict_list = [
{"a": None, "b": 2},
{"a": None, "b": 2},
{"a": 1, "b": None}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 2
assert {"a": None, "b": 2} in result
assert {"a": 1, "b": None} in result
def test_mixed_key_types(self):
"""Test with mixed key types (string and numeric)"""
dict_list = [
{"a": 1, 1: "one"},
{1: "one", "a": 1},
{"a": 2, 1: "one"}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 2
@pytest.mark.parametrize("dict_list,expected_length", [
([{"a": 1}, {"a": 1}, {"a": 1}], 1),
([{"a": 1}, {"a": 2}, {"a": 3}], 3),
([{"a": 1, "b": 2}, {"b": 2, "a": 1}], 1),
([{}, {}], 1),
([{"x": [1, 2]}, {"x": [1, 2]}], 1),
([{"a": 1}, {"b": 2}, {"c": 3}], 3),
]) # pyright: ignore[reportUnknownArgumentType]
def test_parametrized_unique_dicts(self, dict_list: list[Any], expected_length: int):
"""Test make_unique_list_of_dicts with various input combinations"""
result = make_unique_list_of_dicts(dict_list)
assert len(result) == expected_length
assert isinstance(result, list)
def test_large_list(self):
"""Test with a large list of dictionaries"""
dict_list = [{"id": i % 100, "value": f"val_{i % 100}"} for i in range(1000)]
result = make_unique_list_of_dicts(dict_list)
# Should have 100 unique dicts (0-99)
assert len(result) == 100
def test_preserves_last_occurrence(self):
"""Test behavior with duplicate entries"""
# The function uses dict comprehension, which keeps last occurrence
dict_list = [
{"a": 1, "b": 2},
{"a": 3, "b": 4},
{"a": 1, "b": 2}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 2
# Just verify correct unique count, order may vary
def test_nested_dicts(self):
"""Test with nested dictionaries"""
dict_list = [
{"outer": {"inner": 1}},
{"outer": {"inner": 1}},
{"outer": {"inner": 2}}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 2
def test_string_values_case_sensitive(self):
"""Test that string values are case-sensitive"""
dict_list = [
{"name": "John"},
{"name": "john"},
{"name": "JOHN"},
{"name": "John"}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 3
def test_boolean_values(self):
"""Test with boolean values"""
dict_list = [
{"flag": True, "count": 1},
{"count": 1, "flag": True},
{"flag": False, "count": 1}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 2
assert {"flag": True, "count": 1} in result
assert {"flag": False, "count": 1} in result
# __END__

View File

@@ -28,6 +28,7 @@ def tmp_log_path(tmp_path: Path) -> Path:
@pytest.fixture @pytest.fixture
def basic_log_settings() -> LogSettings: def basic_log_settings() -> LogSettings:
"""Basic log settings for testing""" """Basic log settings for testing"""
# Return a new dict each time to avoid state pollution
return { return {
"log_level_console": LoggingLevel.WARNING, "log_level_console": LoggingLevel.WARNING,
"log_level_file": LoggingLevel.DEBUG, "log_level_file": LoggingLevel.DEBUG,
@@ -308,4 +309,54 @@ class TestUpdateConsoleFormatter:
# Verify message was logged # Verify message was logged
assert "Test warning message" in caplog.text assert "Test warning message" in caplog.text
def test_log_console_format_option_set_to_none(
self, tmp_log_path: Path
):
"""Test that when log_console_format option is set to None, it uses ConsoleFormatSettings.ALL"""
# Save the original DEFAULT_LOG_SETTINGS to restore it after test
original_default = Log.DEFAULT_LOG_SETTINGS.copy()
try:
# Reset DEFAULT_LOG_SETTINGS to ensure clean state
Log.DEFAULT_LOG_SETTINGS = {
"log_level_console": Log.DEFAULT_LOG_LEVEL_CONSOLE,
"log_level_file": Log.DEFAULT_LOG_LEVEL_FILE,
"per_run_log": False,
"console_enabled": True,
"console_color_output_enabled": True,
"console_format_type": ConsoleFormatSettings.ALL,
"add_start_info": True,
"add_end_info": False,
"log_queue": None,
}
# Create a fresh settings dict with console_format_type explicitly set to None
settings: LogSettings = {
"log_level_console": LoggingLevel.WARNING,
"log_level_file": LoggingLevel.DEBUG,
"per_run_log": False,
"console_enabled": True,
"console_color_output_enabled": False,
"console_format_type": None, # type: ignore
"add_start_info": False,
"add_end_info": False,
"log_queue": None,
}
# Verify that None is explicitly set in the input
assert settings['console_format_type'] is None
log = Log(
log_path=tmp_log_path,
log_name="test_log",
log_settings=settings
)
# Verify that None was replaced with ConsoleFormatSettings.ALL
# The Log class should replace None with the default value (ALL)
assert log.log_settings['console_format_type'] == ConsoleFormatSettings.ALL
finally:
# Restore original DEFAULT_LOG_SETTINGS
Log.DEFAULT_LOG_SETTINGS = original_default
# __END__ # __END__

View File

@@ -2,11 +2,10 @@
PyTest: requests_handling/caller PyTest: requests_handling/caller
""" """
from typing import Any
from unittest.mock import Mock, patch from unittest.mock import Mock, patch
import pytest import pytest
import requests import requests
from corelibs.requests_handling.caller import Caller from corelibs.requests_handling.caller import Caller, ErrorResponse, ProxyConfig
class TestCallerInit: class TestCallerInit:
@@ -21,13 +20,17 @@ class TestCallerInit:
assert caller.timeout == 20 assert caller.timeout == 20
assert caller.verify is True assert caller.verify is True
assert caller.proxy is None assert caller.proxy is None
assert caller.cafile is None assert caller.ca_file is None
def test_init_with_all_params(self): def test_init_with_all_params(self):
"""Test Caller initialization with all parameters""" """Test Caller initialization with all parameters"""
header = {"Authorization": "Bearer token", "Content-Type": "application/json"} header = {"Authorization": "Bearer token", "Content-Type": "application/json"}
proxy = {"http": "http://proxy.example.com:8080", "https": "https://proxy.example.com:8080"} proxy: ProxyConfig = {
caller = Caller(header=header, verify=False, timeout=30, proxy=proxy) "type": "socks5",
"host": "proxy.example.com:8080",
"port": "8080"
}
caller = Caller(header=header, timeout=30, proxy=proxy, verify=False)
assert caller.headers == header assert caller.headers == header
assert caller.timeout == 30 assert caller.timeout == 30
@@ -58,7 +61,7 @@ class TestCallerInit:
ca_file_path = "/path/to/ca/cert.pem" ca_file_path = "/path/to/ca/cert.pem"
caller = Caller(header={}, ca_file=ca_file_path) caller = Caller(header={}, ca_file=ca_file_path)
assert caller.cafile == ca_file_path assert caller.ca_file == ca_file_path
class TestCallerGet: class TestCallerGet:
@@ -81,7 +84,8 @@ class TestCallerGet:
headers={"Authorization": "Bearer token"}, headers={"Authorization": "Bearer token"},
timeout=20, timeout=20,
verify=True, verify=True,
proxies=None proxies=None,
cert=None
) )
@patch('corelibs.requests_handling.caller.requests.get') @patch('corelibs.requests_handling.caller.requests.get')
@@ -101,7 +105,8 @@ class TestCallerGet:
headers={}, headers={},
timeout=20, timeout=20,
verify=True, verify=True,
proxies=None proxies=None,
cert=None
) )
@patch('corelibs.requests_handling.caller.requests.get') @patch('corelibs.requests_handling.caller.requests.get')
@@ -134,7 +139,11 @@ class TestCallerGet:
mock_response = Mock(spec=requests.Response) mock_response = Mock(spec=requests.Response)
mock_get.return_value = mock_response mock_get.return_value = mock_response
proxy = {"http": "http://proxy.example.com:8080"} proxy: ProxyConfig = {
"type": "socks5",
"host": "proxy.example.com:8080",
"port": "8080"
}
caller = Caller(header={}, proxy=proxy) caller = Caller(header={}, proxy=proxy)
caller.get("https://api.example.com/data") caller.get("https://api.example.com/data")
@@ -142,40 +151,46 @@ class TestCallerGet:
assert mock_get.call_args[1]["proxies"] == proxy assert mock_get.call_args[1]["proxies"] == proxy
@patch('corelibs.requests_handling.caller.requests.get') @patch('corelibs.requests_handling.caller.requests.get')
def test_get_invalid_schema_returns_none(self, mock_get: Mock, capsys: Any): def test_get_invalid_schema_returns_none(self, mock_get: Mock):
"""Test GET request with invalid URL schema returns None""" """Test GET request with invalid URL schema returns ErrorResponse"""
mock_get.side_effect = requests.exceptions.InvalidSchema("Invalid URL") mock_get.side_effect = requests.exceptions.InvalidSchema("Invalid URL")
caller = Caller(header={}) caller = Caller(header={})
response = caller.get("invalid://example.com") response = caller.get("invalid://example.com")
assert response is None assert isinstance(response, ErrorResponse)
captured = capsys.readouterr() assert response.code == 200
assert "Invalid URL during 'get'" in captured.out assert "Invalid URL during 'get'" in response.message
assert response.action == "get"
assert response.url == "invalid://example.com"
@patch('corelibs.requests_handling.caller.requests.get') @patch('corelibs.requests_handling.caller.requests.get')
def test_get_timeout_returns_none(self, mock_get: Mock, capsys: Any): def test_get_timeout_returns_none(self, mock_get: Mock):
"""Test GET request timeout returns None""" """Test GET request timeout returns ErrorResponse"""
mock_get.side_effect = requests.exceptions.ReadTimeout("Timeout") mock_get.side_effect = requests.exceptions.ReadTimeout("Timeout")
caller = Caller(header={}) caller = Caller(header={})
response = caller.get("https://api.example.com/data") response = caller.get("https://api.example.com/data")
assert response is None assert isinstance(response, ErrorResponse)
captured = capsys.readouterr() assert response.code == 300
assert "Timeout (20s) during 'get'" in captured.out assert "Timeout (20s) during 'get'" in response.message
assert response.action == "get"
assert response.url == "https://api.example.com/data"
@patch('corelibs.requests_handling.caller.requests.get') @patch('corelibs.requests_handling.caller.requests.get')
def test_get_connection_error_returns_none(self, mock_get: Mock, capsys: Any): def test_get_connection_error_returns_none(self, mock_get: Mock):
"""Test GET request connection error returns None""" """Test GET request connection error returns ErrorResponse"""
mock_get.side_effect = requests.exceptions.ConnectionError("Connection failed") mock_get.side_effect = requests.exceptions.ConnectionError("Connection failed")
caller = Caller(header={}) caller = Caller(header={})
response = caller.get("https://api.example.com/data") response = caller.get("https://api.example.com/data")
assert response is None assert isinstance(response, ErrorResponse)
captured = capsys.readouterr() assert response.code == 400
assert "Connection error during 'get'" in captured.out assert "Connection error during 'get'" in response.message
assert response.action == "get"
assert response.url == "https://api.example.com/data"
class TestCallerPost: class TestCallerPost:
@@ -200,7 +215,8 @@ class TestCallerPost:
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
timeout=20, timeout=20,
verify=True, verify=True,
proxies=None proxies=None,
cert=None
) )
@patch('corelibs.requests_handling.caller.requests.post') @patch('corelibs.requests_handling.caller.requests.post')
@@ -234,40 +250,46 @@ class TestCallerPost:
assert mock_post.call_args[1]["json"] == data assert mock_post.call_args[1]["json"] == data
@patch('corelibs.requests_handling.caller.requests.post') @patch('corelibs.requests_handling.caller.requests.post')
def test_post_invalid_schema_returns_none(self, mock_post: Mock, capsys: Any): def test_post_invalid_schema_returns_none(self, mock_post: Mock):
"""Test POST request with invalid URL schema returns None""" """Test POST request with invalid URL schema returns ErrorResponse"""
mock_post.side_effect = requests.exceptions.InvalidSchema("Invalid URL") mock_post.side_effect = requests.exceptions.InvalidSchema("Invalid URL")
caller = Caller(header={}) caller = Caller(header={})
response = caller.post("invalid://example.com", data={"test": "data"}) response = caller.post("invalid://example.com", data={"test": "data"})
assert response is None assert isinstance(response, ErrorResponse)
captured = capsys.readouterr() assert response.code == 200
assert "Invalid URL during 'post'" in captured.out assert "Invalid URL during 'post'" in response.message
assert response.action == "post"
assert response.url == "invalid://example.com"
@patch('corelibs.requests_handling.caller.requests.post') @patch('corelibs.requests_handling.caller.requests.post')
def test_post_timeout_returns_none(self, mock_post: Mock, capsys: Any): def test_post_timeout_returns_none(self, mock_post: Mock):
"""Test POST request timeout returns None""" """Test POST request timeout returns ErrorResponse"""
mock_post.side_effect = requests.exceptions.ReadTimeout("Timeout") mock_post.side_effect = requests.exceptions.ReadTimeout("Timeout")
caller = Caller(header={}) caller = Caller(header={})
response = caller.post("https://api.example.com/data", data={"test": "data"}) response = caller.post("https://api.example.com/data", data={"test": "data"})
assert response is None assert isinstance(response, ErrorResponse)
captured = capsys.readouterr() assert response.code == 300
assert "Timeout (20s) during 'post'" in captured.out assert "Timeout (20s) during 'post'" in response.message
assert response.action == "post"
assert response.url == "https://api.example.com/data"
@patch('corelibs.requests_handling.caller.requests.post') @patch('corelibs.requests_handling.caller.requests.post')
def test_post_connection_error_returns_none(self, mock_post: Mock, capsys: Any): def test_post_connection_error_returns_none(self, mock_post: Mock):
"""Test POST request connection error returns None""" """Test POST request connection error returns ErrorResponse"""
mock_post.side_effect = requests.exceptions.ConnectionError("Connection failed") mock_post.side_effect = requests.exceptions.ConnectionError("Connection failed")
caller = Caller(header={}) caller = Caller(header={})
response = caller.post("https://api.example.com/data", data={"test": "data"}) response = caller.post("https://api.example.com/data", data={"test": "data"})
assert response is None assert isinstance(response, ErrorResponse)
captured = capsys.readouterr() assert response.code == 400
assert "Connection error during 'post'" in captured.out assert "Connection error during 'post'" in response.message
assert response.action == "post"
assert response.url == "https://api.example.com/data"
class TestCallerPut: class TestCallerPut:
@@ -292,7 +314,8 @@ class TestCallerPut:
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
timeout=20, timeout=20,
verify=True, verify=True,
proxies=None proxies=None,
cert=None
) )
@patch('corelibs.requests_handling.caller.requests.put') @patch('corelibs.requests_handling.caller.requests.put')
@@ -311,16 +334,18 @@ class TestCallerPut:
assert mock_put.call_args[1]["params"] == params assert mock_put.call_args[1]["params"] == params
@patch('corelibs.requests_handling.caller.requests.put') @patch('corelibs.requests_handling.caller.requests.put')
def test_put_timeout_returns_none(self, mock_put: Mock, capsys: Any): def test_put_timeout_returns_none(self, mock_put: Mock):
"""Test PUT request timeout returns None""" """Test PUT request timeout returns ErrorResponse"""
mock_put.side_effect = requests.exceptions.ReadTimeout("Timeout") mock_put.side_effect = requests.exceptions.ReadTimeout("Timeout")
caller = Caller(header={}) caller = Caller(header={})
response = caller.put("https://api.example.com/data/1", data={"test": "data"}) response = caller.put("https://api.example.com/data/1", data={"test": "data"})
assert response is None assert isinstance(response, ErrorResponse)
captured = capsys.readouterr() assert response.code == 300
assert "Timeout (20s) during 'put'" in captured.out assert "Timeout (20s) during 'put'" in response.message
assert response.action == "put"
assert response.url == "https://api.example.com/data/1"
class TestCallerPatch: class TestCallerPatch:
@@ -345,7 +370,8 @@ class TestCallerPatch:
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
timeout=20, timeout=20,
verify=True, verify=True,
proxies=None proxies=None,
cert=None
) )
@patch('corelibs.requests_handling.caller.requests.patch') @patch('corelibs.requests_handling.caller.requests.patch')
@@ -364,16 +390,18 @@ class TestCallerPatch:
assert mock_patch.call_args[1]["params"] == params assert mock_patch.call_args[1]["params"] == params
@patch('corelibs.requests_handling.caller.requests.patch') @patch('corelibs.requests_handling.caller.requests.patch')
def test_patch_connection_error_returns_none(self, mock_patch: Mock, capsys: Any): def test_patch_connection_error_returns_none(self, mock_patch: Mock):
"""Test PATCH request connection error returns None""" """Test PATCH request connection error returns ErrorResponse"""
mock_patch.side_effect = requests.exceptions.ConnectionError("Connection failed") mock_patch.side_effect = requests.exceptions.ConnectionError("Connection failed")
caller = Caller(header={}) caller = Caller(header={})
response = caller.patch("https://api.example.com/data/1", data={"test": "data"}) response = caller.patch("https://api.example.com/data/1", data={"test": "data"})
assert response is None assert isinstance(response, ErrorResponse)
captured = capsys.readouterr() assert response.code == 400
assert "Connection error during 'patch'" in captured.out assert "Connection error during 'patch'" in response.message
assert response.action == "patch"
assert response.url == "https://api.example.com/data/1"
class TestCallerDelete: class TestCallerDelete:
@@ -396,7 +424,8 @@ class TestCallerDelete:
headers={"Authorization": "Bearer token"}, headers={"Authorization": "Bearer token"},
timeout=20, timeout=20,
verify=True, verify=True,
proxies=None proxies=None,
cert=None
) )
@patch('corelibs.requests_handling.caller.requests.delete') @patch('corelibs.requests_handling.caller.requests.delete')
@@ -414,16 +443,18 @@ class TestCallerDelete:
assert mock_delete.call_args[1]["params"] == params assert mock_delete.call_args[1]["params"] == params
@patch('corelibs.requests_handling.caller.requests.delete') @patch('corelibs.requests_handling.caller.requests.delete')
def test_delete_invalid_schema_returns_none(self, mock_delete: Mock, capsys: Any): def test_delete_invalid_schema_returns_none(self, mock_delete: Mock):
"""Test DELETE request with invalid URL schema returns None""" """Test DELETE request with invalid URL schema returns ErrorResponse"""
mock_delete.side_effect = requests.exceptions.InvalidSchema("Invalid URL") mock_delete.side_effect = requests.exceptions.InvalidSchema("Invalid URL")
caller = Caller(header={}) caller = Caller(header={})
response = caller.delete("invalid://example.com/data/1") response = caller.delete("invalid://example.com/data/1")
assert response is None assert isinstance(response, ErrorResponse)
captured = capsys.readouterr() assert response.code == 200
assert "Invalid URL during 'delete'" in captured.out assert "Invalid URL during 'delete'" in response.message
assert response.action == "delete"
assert response.url == "invalid://example.com/data/1"
class TestCallerParametrized: class TestCallerParametrized:
@@ -492,7 +523,7 @@ class TestCallerParametrized:
]) ])
@patch('corelibs.requests_handling.caller.requests.get') @patch('corelibs.requests_handling.caller.requests.get')
def test_exception_handling( def test_exception_handling(
self, mock_get: Mock, exception_class: type, expected_message: str, capsys: Any self, mock_get: Mock, exception_class: type, expected_message: str
): ):
"""Test exception handling for all exception types""" """Test exception handling for all exception types"""
mock_get.side_effect = exception_class("Test error") mock_get.side_effect = exception_class("Test error")
@@ -500,9 +531,8 @@ class TestCallerParametrized:
caller = Caller(header={}) caller = Caller(header={})
response = caller.get("https://api.example.com/data") response = caller.get("https://api.example.com/data")
assert response is None assert isinstance(response, ErrorResponse)
captured = capsys.readouterr() assert expected_message in response.message
assert expected_message in captured.out
class TestCallerIntegration: class TestCallerIntegration:
@@ -599,7 +629,8 @@ class TestCallerEdgeCases:
headers={}, headers={},
timeout=20, timeout=20,
verify=True, verify=True,
proxies=None proxies=None,
cert=None
) )
@patch('corelibs.requests_handling.caller.requests.post') @patch('corelibs.requests_handling.caller.requests.post')
@@ -659,7 +690,8 @@ class TestCallerEdgeCases:
headers={}, headers={},
timeout=20, timeout=20,
verify=True, verify=True,
proxies=None proxies=None,
cert=None
) )
@patch('corelibs.requests_handling.caller.requests.get') @patch('corelibs.requests_handling.caller.requests.get')
@@ -679,7 +711,8 @@ class TestCallerEdgeCases:
headers={}, headers={},
timeout=20, timeout=20,
verify=True, verify=True,
proxies=None proxies=None,
cert=None
) )
def test_timeout_zero(self): def test_timeout_zero(self):
@@ -730,9 +763,10 @@ class TestCallerProxyHandling:
mock_response = Mock(spec=requests.Response) mock_response = Mock(spec=requests.Response)
mock_get.return_value = mock_response mock_get.return_value = mock_response
proxy = { proxy: ProxyConfig = {
"http": "http://proxy.example.com:8080", "type": "socks5",
"https": "https://proxy.example.com:8080" "host": "proxy.example.com:8080",
"port": "8080"
} }
caller = Caller(header={}, proxy=proxy) caller = Caller(header={}, proxy=proxy)
caller.get("https://api.example.com/data") caller.get("https://api.example.com/data")
@@ -746,9 +780,10 @@ class TestCallerProxyHandling:
mock_response = Mock(spec=requests.Response) mock_response = Mock(spec=requests.Response)
mock_post.return_value = mock_response mock_post.return_value = mock_response
proxy = { proxy: ProxyConfig = {
"http": "http://user:pass@proxy.example.com:8080", "type": "socks5",
"https": "https://user:pass@proxy.example.com:8080" "host": "proxy.example.com:8080",
"port": "8080"
} }
caller = Caller(header={}, proxy=proxy) caller = Caller(header={}, proxy=proxy)
caller.post("https://api.example.com/data", data={"test": "data"}) caller.post("https://api.example.com/data", data={"test": "data"})
@@ -789,7 +824,7 @@ class TestCallerResponseHandling:
caller = Caller(header={}) caller = Caller(header={})
response = caller.get("https://api.example.com/data") response = caller.get("https://api.example.com/data")
assert response is not None assert not isinstance(response, ErrorResponse)
assert response.status_code == 200 assert response.status_code == 200
assert response.text == "Success" assert response.text == "Success"
assert response.json() == {"status": "ok"} assert response.json() == {"status": "ok"}
@@ -805,7 +840,7 @@ class TestCallerResponseHandling:
caller = Caller(header={}) caller = Caller(header={})
response = caller.get("https://api.example.com/data") response = caller.get("https://api.example.com/data")
assert response is not None assert not isinstance(response, ErrorResponse)
assert response.status_code == status_code assert response.status_code == status_code