Compare commits

...

23 Commits

Author SHA1 Message Date
Clemens Schwaighofer
d098eb58f3 v0.48.0: Update Caller class with better error handling and reporting 2026-01-30 18:20:21 +09:00
Clemens Schwaighofer
5319a059ad Update the caller class
- has now ErrorResponse return values instead of None on errors
- changed parameter cafile to ca_file and its position in the init method
- Proxy has ProxyConfig Typed Dict format

Tests updates to reflect those changes
2026-01-30 18:17:41 +09:00
Clemens Schwaighofer
163b8c4018 Update caller Class, backport from github manage script 2026-01-30 17:32:30 +09:00
Clemens Schwaighofer
6322b95068 v0.47.0: fingerprint update with fallback for str/int index overlaps 2026-01-27 17:15:32 +09:00
Clemens Schwaighofer
715ed1f9c2 Docblocks update in in iterator handling fingerprint 2026-01-27 17:14:31 +09:00
Clemens Schwaighofer
82a759dd21 Fix fingerprint with mixed int and str keys
Create a fallback hash function to handle mixed key types in dictionaries
and lists, ensuring consistent hashing across different data structures.

Fallback called is prefixed with "HO_" to indicate its usage.
2026-01-27 15:59:38 +09:00
Clemens Schwaighofer
fe913608c4 Fix iteration list helpers dict list type 2026-01-27 14:52:11 +09:00
Clemens Schwaighofer
79f9c5d1c6 iterator list helpers tests run cases updated 2026-01-27 14:51:25 +09:00
Clemens Schwaighofer
3d091129e2 v0.46.0: Add unique list helper function 2026-01-27 14:43:35 +09:00
Clemens Schwaighofer
1a978f786d Add a list helper to create unique list of dictionaries and tests for it. 2026-01-27 14:42:19 +09:00
Clemens Schwaighofer
51669d3c5f Settings loader test-run add boolean convert check test 2026-01-23 18:07:52 +09:00
Clemens Schwaighofer
d128dcb479 v0.45.1: Fix Log with log console format set to None 2026-01-23 15:16:38 +09:00
Clemens Schwaighofer
84286593f6 Log fix bug where log consosle format set to None would throw an exception
Also add prefix "[SettingsLoader] " to print statements in SettingsLoader if we do not write to log
2026-01-23 15:14:31 +09:00
Clemens Schwaighofer
8d97f09e5e v0.45.0: Log add function to get console formatter flags set 2026-01-23 11:37:02 +09:00
Clemens Schwaighofer
2748bc19be Log, add get console formatter method
Returns current flags set for console formatter
2026-01-23 11:33:38 +09:00
Clemens Schwaighofer
0b3c8fc774 v0.44.2: Move the compiled regex into dedicated file 2026-01-09 16:17:27 +09:00
Clemens Schwaighofer
7da18e0f00 Moved the compiled regex patterns to a new file regex_constants_compiled
So we do not force the compiled build if not needed
2026-01-09 16:15:38 +09:00
Clemens Schwaighofer
49e38081ad v0.44.1: add pre compiled regexes 2026-01-08 15:16:26 +09:00
Clemens Schwaighofer
a14f993a31 Add pre-compiled REGEX entries to the regex pattern file
compiled ones hare prefixed with COMPILED_
2026-01-08 15:14:48 +09:00
Clemens Schwaighofer
ae938f9909 v0.44.0: Add more REGEX patters for email matching 2026-01-08 14:59:49 +09:00
Clemens Schwaighofer
f91e0bb93a Add new regex constants for email handling and update related tests 2026-01-08 14:58:14 +09:00
Clemens Schwaighofer
d3f61005cf v0.43.4: Fix for config loader with empty to split into lists values 2026-01-06 10:04:03 +09:00
Clemens Schwaighofer
2923a3e88b Fix settings loader to return empty list when splitting empty string value 2026-01-06 09:58:21 +09:00
21 changed files with 1518 additions and 232 deletions

View File

@@ -1,7 +1,7 @@
# MARK: Project info
[project]
name = "corelibs"
version = "0.43.3"
version = "0.48.0"
description = "Collection of utils for Python scripts"
readme = "README.md"
requires-python = ">=3.13"

View File

@@ -19,9 +19,26 @@ def compile_re(reg: str) -> re.Pattern[str]:
# email regex
EMAIL_BASIC_REGEX: str = r"""
^[A-Za-z0-9!#$%&'*+\-\/=?^_`{|}~][A-Za-z0-9!#$%:\(\)&'*+\-\/=?^_`{|}~\.]{0,63}
@(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[a-zA-Z]{2,6}$
SUB_EMAIL_BASIC_REGEX: str = r"""
[A-Za-z0-9!#$%&'*+\-\/=?^_`{|}~][A-Za-z0-9!#$%:\(\)&'*+\-\/=?^_`{|}~\.]{0,63}
@(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[a-zA-Z]{2,6}
"""
EMAIL_BASIC_REGEX = rf"^{SUB_EMAIL_BASIC_REGEX}$"
# name + email regex for email sending type like "foo bar" <email@mail.com>
NAME_EMAIL_SIMPLE_REGEX = r"""
^\s*(?:"(?P<name1>[^"]+)"\s*<(?P<email1>[^>]+)>|
(?P<name2>.+?)\s*<(?P<email2>[^>]+)>|
<(?P<email3>[^>]+)>|
(?P<email4>[^\s<>]+))\s*$
"""
# name + email with the basic regex set
NAME_EMAIL_BASIC_REGEX = rf"""
^\s*(?:
"(?P<name1>[^"]+)"\s*<(?P<email1>{SUB_EMAIL_BASIC_REGEX})>|
(?P<name2>.+?)\s*<(?P<email2>{SUB_EMAIL_BASIC_REGEX})>|
<(?P<email3>{SUB_EMAIL_BASIC_REGEX})>|
(?P<email4>{SUB_EMAIL_BASIC_REGEX})
)\s*$
"""
# Domain regex with localhost
DOMAIN_WITH_LOCALHOST_REGEX: str = r"""

View File

@@ -0,0 +1,23 @@
"""
List of regex compiled strings that can be used
"""
from corelibs.check_handling.regex_constants import (
compile_re,
EMAIL_BASIC_REGEX,
NAME_EMAIL_SIMPLE_REGEX,
NAME_EMAIL_BASIC_REGEX,
DOMAIN_WITH_LOCALHOST_REGEX,
DOMAIN_WITH_LOCALHOST_PORT_REGEX,
DOMAIN_REGEX
)
# all above in compiled form
COMPILED_EMAIL_BASIC_REGEX = compile_re(EMAIL_BASIC_REGEX)
COMPILED_NAME_EMAIL_SIMPLE_REGEX = compile_re(NAME_EMAIL_SIMPLE_REGEX)
COMPILED_NAME_EMAIL_BASIC_REGEX = compile_re(NAME_EMAIL_BASIC_REGEX)
COMPILED_DOMAIN_WITH_LOCALHOST_REGEX = compile_re(DOMAIN_WITH_LOCALHOST_REGEX)
COMPILED_DOMAIN_WITH_LOCALHOST_PORT_REGEX = compile_re(DOMAIN_WITH_LOCALHOST_PORT_REGEX)
COMPILED_DOMAIN_REGEX = compile_re(DOMAIN_REGEX)
# __END__

View File

@@ -173,10 +173,13 @@ class SettingsLoader:
args_overrride.append(key)
if skip:
continue
settings[config_id][key] = [
__value.replace(" ", "")
for __value in settings[config_id][key].split(split_char)
]
if settings[config_id][key]:
settings[config_id][key] = [
__value.replace(" ", "")
for __value in settings[config_id][key].split(split_char)
]
else:
settings[config_id][key] = []
except KeyError as e:
raise ValueError(self.__print(
f"[!] Cannot read [{config_id}] block because the entry [{e}] could not be found",
@@ -574,7 +577,7 @@ class SettingsLoader:
self.log.logger.log(Log.get_log_level_int(level), msg, stacklevel=2)
if self.log is None or self.always_print:
if print_error:
print(msg)
print(f"[SettingsLoader] {msg}")
if level == 'ERROR':
# remove any prefix [!] for error message list
self.__error_msg.append(msg.replace('[!] ', '').strip())

View File

@@ -4,6 +4,8 @@ Send email wrapper
import smtplib
from email.message import EmailMessage
from email.header import Header
from email.utils import formataddr, parseaddr
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from corelibs.logging_handling.log import Logger
@@ -133,21 +135,30 @@ class SendEmail:
_subject = template["subject"]
_body = template["body"]
for key, value in replace.items():
_subject = _subject.replace(f"{{{{{key}}}}}", value)
_body = _body.replace(f"{{{{{key}}}}}", value)
placeholder = f"{{{{{key}}}}}"
_subject = _subject.replace(placeholder, value)
_body = _body.replace(placeholder, value)
name, addr = parseaddr(from_email)
if name:
# Encode the name part with MIME encoding
encoded_name = str(Header(name, 'utf-8'))
from_email_encoded = formataddr((encoded_name, addr))
else:
from_email_encoded = from_email
# create a simple email and add subhect, from email
msg_email = EmailMessage()
# msg.set_content(_body, charset='utf-8', cte='quoted-printable')
msg_email.set_content(_body, charset="utf-8")
msg_email["Subject"] = _subject
msg_email["From"] = from_email
msg_email["From"] = from_email_encoded
# push to array for sening
msg.append(msg_email)
return msg
def send_email_list(
self,
email: list[EmailMessage], receivers: list[str],
emails: list[EmailMessage],
receivers: list[str],
combined_send: bool | None = None,
test_only: bool | None = None
):
@@ -170,18 +181,27 @@ class SendEmail:
smtp = smtplib.SMTP(smtp_host)
except ConnectionRefusedError as e:
self.log.error("Could not open SMTP connection to: %s, %s", smtp_host, e)
# prepare receiver list
receivers_encoded: list[str] = []
for __receiver in receivers:
to_name, to_addr = parseaddr(__receiver)
if to_name:
# Encode the name part with MIME encoding
encoded_to_name = str(Header(to_name, 'utf-8'))
receivers_encoded.append(formataddr((encoded_to_name, to_addr)))
else:
receivers_encoded.append(__receiver)
# loop over messages and then over recievers
for msg in email:
for msg in emails:
if combined_send is True:
msg["To"] = ", ".join(receivers)
msg["To"] = ", ".join(receivers_encoded)
if not self.settings.get('test'):
if smtp is not None:
smtp.send_message(msg, msg["From"], receivers)
smtp.send_message(msg, msg["From"], receivers_encoded)
else:
self.log.info(f"[EMAIL] Test, not sending email\n{msg}")
else:
for receiver in receivers:
# send to
for receiver in receivers_encoded:
self.log.debug(f"===> Send to: {receiver}")
if "To" in msg:
msg.replace_header("To", receiver)

View File

@@ -4,11 +4,38 @@ Various dictionary, object and list hashers
import json
import hashlib
from typing import Any
from typing import Any, cast, Sequence
def hash_object(obj: Any) -> str:
"""
RECOMMENDED for new use
Create a hash for any dict or list with mixed key types
Arguments:
obj {Any} -- _description_
Returns:
str -- _description_
"""
def normalize(o: Any) -> Any:
if isinstance(o, dict):
# Sort by repr of keys to handle mixed types (str, int, etc.)
o = cast(dict[Any, Any], o)
return tuple(sorted((repr(k), normalize(v)) for k, v in o.items()))
if isinstance(o, (list, tuple)):
o = cast(Sequence[Any], o)
return tuple(normalize(item) for item in o)
return repr(o)
normalized = normalize(obj)
return hashlib.sha256(str(normalized).encode()).hexdigest()
def dict_hash_frozen(data: dict[Any, Any]) -> int:
"""
NOT RECOMMENDED, use dict_hash_crc or hash_object instead
If used, DO NOT CHANGE
hash a dict via freeze
Args:
@@ -22,18 +49,25 @@ def dict_hash_frozen(data: dict[Any, Any]) -> int:
def dict_hash_crc(data: dict[Any, Any] | list[Any]) -> str:
"""
Create a sha256 hash over dict
LEGACY METHOD, must be kept for fallback, if used by other code, DO NOT CHANGE
Create a sha256 hash over dict or list
alternative for
dict_hash_frozen
Args:
data (dict | list): _description_
data (dict[Any, Any] | list[Any]): _description_
Returns:
str: _description_
str: sha256 hash, prefiex with HO_ if fallback used
"""
return hashlib.sha256(
json.dumps(data, sort_keys=True, ensure_ascii=True).encode('utf-8')
).hexdigest()
try:
return hashlib.sha256(
# IT IS IMPORTANT THAT THE BELOW CALL STAYS THE SAME AND DOES NOT CHANGE OR WE WILL GET DIFFERENT HASHES
# separators=(',', ':') to get rid of spaces, but if this is used the hash will be different, DO NOT ADD
json.dumps(data, sort_keys=True, ensure_ascii=True, default=str).encode('utf-8')
).hexdigest()
except TypeError:
# Fallback tod different hasher, will return DIFFERENT hash than above, so only usable in int/str key mixes
return "HO_" + hash_object(data)
# __END__

View File

@@ -2,6 +2,7 @@
List type helpers
"""
import json
from typing import Any, Sequence
@@ -44,4 +45,31 @@ def is_list_in_list(
# Get the difference and extract just the values
return [item for item, _ in set_a - set_b]
def make_unique_list_of_dicts(dict_list: list[Any]) -> list[Any]:
"""
Create a list of unique dictionary entries
Arguments:
dict_list {list[Any]} -- _description_
Returns:
list[Any] -- _description_
"""
try:
# try json dumps, can fail with int and str index types
return list(
{
json.dumps(d, sort_keys=True, ensure_ascii=True, separators=(',', ':')): d
for d in dict_list
}.values()
)
except TypeError:
# Fallback for non-serializable entries, slow but works
unique: list[Any] = []
for d in dict_list:
if d not in unique:
unique.append(d)
return unique
# __END__

View File

@@ -602,9 +602,9 @@ class Log(LogParent):
__setting = self.DEFAULT_LOG_SETTINGS.get(__log_entry, True)
default_log_settings[__log_entry] = __setting
# check console log type
default_log_settings['console_format_type'] = cast('ConsoleFormat', log_settings.get(
'console_format_type', self.DEFAULT_LOG_SETTINGS['console_format_type']
))
if (console_format_type := log_settings.get('console_format_type')) is None:
console_format_type = self.DEFAULT_LOG_SETTINGS['console_format_type']
default_log_settings['console_format_type'] = cast('ConsoleFormat', console_format_type)
# check log queue
__setting = log_settings.get('log_queue', self.DEFAULT_LOG_SETTINGS['log_queue'])
if __setting is not None:
@@ -774,6 +774,16 @@ class Log(LogParent):
self.__set_console_formatter(console_format_type)
)
def get_console_formatter(self) -> ConsoleFormat:
"""
Get the current console formatter, this the settings type
Note that if eg "ALL" is set it will return the combined information but not the ALL flag name itself
Returns:
ConsoleFormat -- _description_
"""
return self.log_settings['console_format_type']
# MARK: console handler
def __create_console_handler(
self, handler_name: str,

View File

@@ -3,32 +3,61 @@ requests lib interface
V2 call type
"""
from typing import Any
import warnings
from typing import Any, TypedDict, cast
import requests
# to hide the verfiy warnings because of the bad SSL settings from Netskope, Akamai, etc
warnings.filterwarnings('ignore', message='Unverified HTTPS request')
from requests import exceptions
class ErrorResponse:
"""
Error response structure. This is returned if a request could not be completed
"""
def __init__(
self,
code: int,
message: str,
action: str,
url: str,
exception: exceptions.InvalidSchema | exceptions.ReadTimeout | exceptions.ConnectionError | None = None
) -> None:
self.code = code
self.message = message
self.action = action
self.url = url
self.exception_name = type(exception).__name__ if exception is not None else None
self.exception_trace = exception if exception is not None else None
class ProxyConfig(TypedDict):
"""
Socks proxy settings
"""
type: str
host: str
port: str
class Caller:
"""_summary_"""
"""
requests lib interface
"""
def __init__(
self,
header: dict[str, str],
verify: bool = True,
timeout: int = 20,
proxy: dict[str, str] | None = None,
proxy: ProxyConfig | None = None,
verify: bool = True,
ca_file: str | None = None
):
self.headers = header
self.timeout: int = timeout
self.cafile = ca_file
self.ca_file = ca_file
self.verify = verify
self.proxy = proxy
self.proxy = cast(dict[str, str], proxy) if proxy is not None else None
def __timeout(self, timeout: int | None) -> int:
if timeout is not None:
if timeout is not None and timeout >= 0:
return timeout
return self.timeout
@@ -39,7 +68,7 @@ class Caller:
data: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
timeout: int | None = None
) -> requests.Response | None:
) -> requests.Response | ErrorResponse:
"""
call wrapper, on error returns None
@@ -56,67 +85,96 @@ class Caller:
if data is None:
data = {}
try:
response = None
if action == "get":
response = requests.get(
return requests.get(
url,
params=params,
headers=self.headers,
timeout=self.__timeout(timeout),
verify=self.verify,
proxies=self.proxy
proxies=self.proxy,
cert=self.ca_file
)
elif action == "post":
response = requests.post(
if action == "post":
return requests.post(
url,
params=params,
json=data,
headers=self.headers,
timeout=self.__timeout(timeout),
verify=self.verify,
proxies=self.proxy
proxies=self.proxy,
cert=self.ca_file
)
elif action == "put":
response = requests.put(
if action == "put":
return requests.put(
url,
params=params,
json=data,
headers=self.headers,
timeout=self.__timeout(timeout),
verify=self.verify,
proxies=self.proxy
proxies=self.proxy,
cert=self.ca_file
)
elif action == "patch":
response = requests.patch(
if action == "patch":
return requests.patch(
url,
params=params,
json=data,
headers=self.headers,
timeout=self.__timeout(timeout),
verify=self.verify,
proxies=self.proxy
proxies=self.proxy,
cert=self.ca_file
)
elif action == "delete":
response = requests.delete(
if action == "delete":
return requests.delete(
url,
params=params,
headers=self.headers,
timeout=self.__timeout(timeout),
verify=self.verify,
proxies=self.proxy
proxies=self.proxy,
cert=self.ca_file
)
return response
except requests.exceptions.InvalidSchema as e:
print(f"Invalid URL during '{action}' for {url}:\n\t{e}")
return None
except requests.exceptions.ReadTimeout as e:
print(f"Timeout ({self.timeout}s) during '{action}' for {url}:\n\t{e}")
return None
except requests.exceptions.ConnectionError as e:
print(f"Connection error during '{action}' for {url}:\n\t{e}")
return None
return ErrorResponse(
100,
f"Unsupported action '{action}'",
action,
url
)
except exceptions.InvalidSchema as e:
return ErrorResponse(
200,
f"Invalid URL during '{action}' for {url}",
action,
url,
e
)
except exceptions.ReadTimeout as e:
return ErrorResponse(
300,
f"Timeout ({self.timeout}s) during '{action}' for {url}",
action,
url,
e
)
except exceptions.ConnectionError as e:
return ErrorResponse(
400,
f"Connection error during '{action}' for {url}",
action,
url,
e
)
def get(self, url: str, params: dict[str, Any] | None = None) -> requests.Response | None:
def get(
self,
url: str,
params: dict[str, Any] | None = None,
timeout: int | None = None
) -> requests.Response | ErrorResponse:
"""
get data
@@ -127,11 +185,15 @@ class Caller:
Returns:
requests.Response: _description_
"""
return self.__call('get', url, params=params)
return self.__call('get', url, params=params, timeout=timeout)
def post(
self, url: str, data: dict[str, Any] | None = None, params: dict[str, Any] | None = None
) -> requests.Response | None:
self,
url: str,
data: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
timeout: int | None = None
) -> requests.Response | ErrorResponse:
"""
post data
@@ -143,11 +205,15 @@ class Caller:
Returns:
requests.Response | None: _description_
"""
return self.__call('post', url, data, params)
return self.__call('post', url, data, params, timeout=timeout)
def put(
self, url: str, data: dict[str, Any] | None = None, params: dict[str, Any] | None = None
) -> requests.Response | None:
self,
url: str,
data: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
timeout: int | None = None
) -> requests.Response | ErrorResponse:
"""_summary_
Args:
@@ -158,11 +224,15 @@ class Caller:
Returns:
requests.Response | None: _description_
"""
return self.__call('put', url, data, params)
return self.__call('put', url, data, params, timeout=timeout)
def patch(
self, url: str, data: dict[str, Any] | None = None, params: dict[str, Any] | None = None
) -> requests.Response | None:
self,
url: str,
data: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
timeout: int | None = None
) -> requests.Response | ErrorResponse:
"""_summary_
Args:
@@ -173,9 +243,14 @@ class Caller:
Returns:
requests.Response | None: _description_
"""
return self.__call('patch', url, data, params)
return self.__call('patch', url, data, params, timeout=timeout)
def delete(self, url: str, params: dict[str, Any] | None = None) -> requests.Response | None:
def delete(
self,
url: str,
params: dict[str, Any] | None = None,
timeout: int | None = None
) -> requests.Response | ErrorResponse:
"""
delete
@@ -186,6 +261,6 @@ class Caller:
Returns:
requests.Response | None: _description_
"""
return self.__call('delete', url, params=params)
return self.__call('delete', url, params=params, timeout=timeout)
# __END__

View File

@@ -2,14 +2,28 @@
Test check andling for regex checks
"""
import re
from corelibs.check_handling.regex_constants import DOMAIN_WITH_LOCALHOST_REGEX
from corelibs_text_colors.text_colors import Colors
from corelibs.check_handling.regex_constants import (
compile_re, DOMAIN_WITH_LOCALHOST_REGEX, EMAIL_BASIC_REGEX, NAME_EMAIL_BASIC_REGEX, SUB_EMAIL_BASIC_REGEX
)
from corelibs.check_handling.regex_constants_compiled import (
COMPILED_DOMAIN_WITH_LOCALHOST_REGEX, COMPILED_EMAIL_BASIC_REGEX,
COMPILED_NAME_EMAIL_SIMPLE_REGEX, COMPILED_NAME_EMAIL_BASIC_REGEX
)
NAME_EMAIL_SIMPLE_REGEX = r"""
^\s*(?:"(?P<name1>[^"]+)"\s*<(?P<email1>[^>]+)>|
(?P<name2>.+?)\s*<(?P<email2>[^>]+)>|
<(?P<email3>[^>]+)>|
(?P<email4>[^\s<>]+))\s*$
"""
def main():
def domain_test():
"""
Test regex checks
domain regex test
"""
print("=" * 30)
test_domains = [
"example.com",
"localhost",
@@ -18,7 +32,7 @@ def main():
"some-domain.org"
]
regex_domain_check = re.compile(DOMAIN_WITH_LOCALHOST_REGEX)
regex_domain_check = COMPILED_DOMAIN_WITH_LOCALHOST_REGEX
print(f"REGEX: {DOMAIN_WITH_LOCALHOST_REGEX}")
print(f"Check regex: {regex_domain_check.search('localhost')}")
@@ -29,6 +43,66 @@ def main():
print(f"Did not match: {domain}")
def email_test():
"""
email regex test
"""
print("=" * 30)
email_list = """
e@bar.com
<f@foobar.com>
"Master" <foobar@bar.com>
"not valid" not@valid.com
also not valid not@valid.com
some header <something@bar.com>
test master <master@master.com>
日本語 <japan@jp.net>
"ひほん カケ苦" <foo@bar.com>
single@entry.com
arsch@popsch.com
test open <open@open.com>
"""
print(f"REGEX: SUB_EMAIL_BASIC_REGEX: {SUB_EMAIL_BASIC_REGEX}")
print(f"REGEX: EMAIL_BASIC_REGEX: {EMAIL_BASIC_REGEX}")
print(f"REGEX: COMPILED_NAME_EMAIL_SIMPLE_REGEX: {COMPILED_NAME_EMAIL_SIMPLE_REGEX}")
print(f"REGEX: NAME_EMAIL_BASIC_REGEX: {NAME_EMAIL_BASIC_REGEX}")
basic_email = COMPILED_EMAIL_BASIC_REGEX
sub_basic_email = compile_re(SUB_EMAIL_BASIC_REGEX)
simple_name_email_regex = COMPILED_NAME_EMAIL_SIMPLE_REGEX
full_name_email_regex = COMPILED_NAME_EMAIL_BASIC_REGEX
for email in email_list.splitlines():
email = email.strip()
if not email:
continue
print(f">>> Testing: {email}")
if not basic_email.match(email):
print(f"{Colors.red}[EMAIL ] No match: {email}{Colors.reset}")
else:
print(f"{Colors.green}[EMAIL ] Matched : {email}{Colors.reset}")
if not sub_basic_email.match(email):
print(f"{Colors.red}[SUB ] No match: {email}{Colors.reset}")
else:
print(f"{Colors.green}[SUB ] Matched : {email}{Colors.reset}")
if not simple_name_email_regex.match(email):
print(f"{Colors.red}[SIMPLE] No match: {email}{Colors.reset}")
else:
print(f"{Colors.green}[SIMPLE] Matched : {email}{Colors.reset}")
if not full_name_email_regex.match(email):
print(f"{Colors.red}[FULL ] No match: {email}{Colors.reset}")
else:
print(f"{Colors.green}[FULL ] Matched : {email}{Colors.reset}")
def main():
"""
Test regex checks
"""
domain_test()
email_test()
if __name__ == "__main__":
main()

View File

@@ -12,10 +12,12 @@ some_match_list=foo,bar
test_list=a,b,c,d f, g h
other_list=a|b|c|d|
third_list=xy|ab|df|fg
empty_list=
str_length=foobar
int_range=20
int_range_not_set=
int_range_not_set_empty_set=5
bool_var=True
#
match_target=foo
match_target_list=foo,bar,baz

View File

@@ -21,11 +21,6 @@ def main():
Main run
"""
value = "2025/1/1"
regex_c = re.compile(SettingsLoaderCheck.CHECK_SETTINGS['string.date']['regex'], re.VERBOSE)
result = regex_c.search(value)
print(f"regex {regex_c} check against {value} -> {result}")
# for log testing
log = Log(
log_path=ROOT_PATH.joinpath(LOG_DIR, 'settings_loader.log'),
@@ -37,6 +32,11 @@ def main():
)
log.logger.info('Settings loader')
value = "2025/1/1"
regex_c = re.compile(SettingsLoaderCheck.CHECK_SETTINGS['string.date']['regex'], re.VERBOSE)
result = regex_c.search(value)
log.info(f"regex {regex_c} check against {value} -> {result}")
sl = SettingsLoader(
{
'overload_from_args': 'OVERLOAD from ARGS',
@@ -69,6 +69,9 @@ def main():
"split:|",
"check:string.alphanumeric"
],
"empty_list": [
"split:,",
],
"str_length": [
"length:2-10"
],
@@ -81,6 +84,7 @@ def main():
"int_range_not_set_empty_set": [
"empty:"
],
"bool_var": ["convert:bool"],
"match_target": ["matching:foo"],
"match_target_list": ["split:,", "matching:foo|bar|baz",],
"match_source_a": ["in:match_target"],

View File

@@ -24,12 +24,19 @@ def main() -> None:
"lookup_value_c": "B02",
"replace_value": "R02",
},
{
"lookup_value_p": "A03",
"lookup_value_c": "B03",
"replace_value": "R03",
},
]
test_foo = ArraySearchList(
key = "lookup_value_p",
value = "A01"
key="lookup_value_p",
value="A01"
)
print(test_foo)
result = find_in_array_from_list(data, [test_foo])
print(f"Search A: {dump_data(test_foo)} -> {dump_data(result)}")
search: list[ArraySearchList] = [
{
"key": "lookup_value_p",
@@ -38,12 +45,122 @@ def main() -> None:
{
"key": "lookup_value_c",
"value": "B01"
},
]
result = find_in_array_from_list(data, search)
print(f"Search B: {dump_data(search)} -> {dump_data(result)}")
search: list[ArraySearchList] = [
{
"key": "lookup_value_p",
"value": "A01"
},
{
"key": "lookup_value_c",
"value": "B01"
},
{
"key": "lookup_value_c",
"value": "B02"
},
]
try:
result = find_in_array_from_list(data, search)
print(f"Search C: {dump_data(search)} -> {dump_data(result)}")
except KeyError as e:
print(f"Search C raised KeyError: {e}")
search: list[ArraySearchList] = [
{
"key": "lookup_value_p",
"value": "A01"
},
{
"key": "lookup_value_c",
"value": ["B01", "B02"]
},
]
try:
result = find_in_array_from_list(data, search)
print(f"Search D: {dump_data(search)} -> {dump_data(result)}")
except KeyError as e:
print(f"Search D raised KeyError: {e}")
search: list[ArraySearchList] = [
{
"key": "lookup_value_p",
"value": ["A01", "A03"]
},
{
"key": "lookup_value_c",
"value": ["B01", "B02"]
},
]
try:
result = find_in_array_from_list(data, search)
print(f"Search E: {dump_data(search)} -> {dump_data(result)}")
except KeyError as e:
print(f"Search E raised KeyError: {e}")
search: list[ArraySearchList] = [
{
"key": "lookup_value_p",
"value": "NOT FOUND"
},
]
try:
result = find_in_array_from_list(data, search)
print(f"Search F: {dump_data(search)} -> {dump_data(result)}")
except KeyError as e:
print(f"Search F raised KeyError: {e}")
data = [
{
"sd_user_id": "1593",
"email": "",
"employee_id": ""
},
{
"sd_user_id": "1592",
"email": "",
"employee_id": ""
},
{
"sd_user_id": "1596",
"email": "",
"employee_id": ""
},
{
"sd_user_id": "1594",
"email": "",
"employee_id": ""
},
{
"sd_user_id": "1595",
"email": "",
"employee_id": ""
},
{
"sd_user_id": "1861",
"email": "",
"employee_id": ""
},
{
"sd_user_id": "1862",
"email": "",
"employee_id": ""
},
{
"sd_user_id": "1860",
"email": "",
"employee_id": ""
}
]
result = find_in_array_from_list(data, search)
print(f"Search {dump_data(search)} -> {dump_data(result)}")
result = find_in_array_from_list(data, [ArraySearchList(
key="sd_user_id",
value="1593"
)])
print(f"Search F: -> {dump_data(result)}")
if __name__ == "__main__":

View File

@@ -2,7 +2,10 @@
test list helpers
"""
from corelibs.iterator_handling.list_helpers import is_list_in_list, convert_to_list
from typing import Any
from corelibs.debug_handling.dump_data import dump_data
from corelibs.iterator_handling.list_helpers import is_list_in_list, convert_to_list, make_unique_list_of_dicts
from corelibs.iterator_handling.fingerprint import dict_hash_crc
def __test_is_list_in_list_a():
@@ -18,9 +21,66 @@ def __convert_list():
print(f"IN: {source} -> {result}")
def __make_unique_list_of_dicts():
dict_list = [
{"a": 1, "b": 2, "nested": {"x": 10, "y": 20}},
{"a": 1, "b": 2, "nested": {"x": 10, "y": 20}},
{"b": 2, "a": 1, "nested": {"y": 20, "x": 10}},
{"b": 2, "a": 1, "nested": {"y": 20, "x": 30}},
{"a": 3, "b": 4, "nested": {"x": 30, "y": 40}}
]
unique_dicts = make_unique_list_of_dicts(dict_list)
dhf = dict_hash_crc(unique_dicts)
print(f"Unique dicts: {dump_data(unique_dicts)} [{dhf}]")
dict_list = [
{"a": 1, 1: "one"},
{1: "one", "a": 1},
{"a": 2, 1: "one"}
]
unique_dicts = make_unique_list_of_dicts(dict_list)
dhf = dict_hash_crc(unique_dicts)
print(f"Unique dicts: {dump_data(unique_dicts)} [{dhf}]")
dict_list = [
{"a": 1, "b": [1, 2, 3]},
{"b": [1, 2, 3], "a": 1},
{"a": 1, "b": [1, 2, 4]},
1, 2, "String", 1, "Foobar"
]
unique_dicts = make_unique_list_of_dicts(dict_list)
dhf = dict_hash_crc(unique_dicts)
print(f"Unique dicts: {dump_data(unique_dicts)} [{dhf}]")
dict_list: list[Any] = [
[],
{},
[],
{},
{"a": []},
{"a": []},
{"a": {}},
{"a": {}},
]
unique_dicts = make_unique_list_of_dicts(dict_list)
dhf = dict_hash_crc(unique_dicts)
print(f"Unique dicts: {dump_data(unique_dicts)} [{dhf}]")
dict_list: list[Any] = [
(1, 2),
(1, 2),
(2, 3),
]
unique_dicts = make_unique_list_of_dicts(dict_list)
dhf = dict_hash_crc(unique_dicts)
print(f"Unique dicts: {dump_data(unique_dicts)} [{dhf}]")
def main():
"""List helpers test runner"""
__test_is_list_in_list_a()
__convert_list()
__make_unique_list_of_dicts()
if __name__ == "__main__":

View File

@@ -27,7 +27,8 @@ def main():
"per_run_log": True,
# "console_format_type": ConsoleFormatSettings.NONE,
# "console_format_type": ConsoleFormatSettings.MINIMAL,
"console_format_type": ConsoleFormat.TIME_MICROSECONDS | ConsoleFormat.NAME | ConsoleFormat.LEVEL,
# "console_format_type": ConsoleFormat.TIME_MICROSECONDS | ConsoleFormat.NAME | ConsoleFormat.LEVEL,
"console_format_type": None,
# "console_format_type": ConsoleFormat.NAME,
# "console_format_type": (
# ConsoleFormat.TIME | ConsoleFormat.TIMEZONE | ConsoleFormat.LINENO | ConsoleFormat.LEVEL
@@ -121,10 +122,16 @@ def main():
log.set_log_level(Log.CONSOLE_HANDLER, LoggingLevel.DEBUG)
log.debug('Current logging format: %s', log.log_settings['console_format_type'])
log.debug('Current console formatter: %s', log.get_console_formatter())
log.update_console_formatter(ConsoleFormat.TIME | ConsoleFormat.LINENO)
log.info('Does hit show less')
log.info('Does hit show less A')
log.debug('Current console formatter after A: %s', log.get_console_formatter())
log.update_console_formatter(ConsoleFormat.TIME | ConsoleFormat.LINENO)
log.info('Does hit show less B')
log.debug('Current console formatter after B: %s', log.get_console_formatter())
log.update_console_formatter(ConsoleFormatSettings.ALL)
log.info('Does hit show less C')
log.debug('Current console formatter after C: %s', log.get_console_formatter())
print(f"*** Any handler is minimum level ERROR: {log.any_handler_is_minimum_level(LoggingLevel.ERROR)}")
print(f"*** Any handler is minimum level DEBUG: {log.any_handler_is_minimum_level(LoggingLevel.DEBUG)}")

View File

@@ -8,10 +8,21 @@ import re
import pytest
from corelibs.check_handling.regex_constants import (
compile_re,
SUB_EMAIL_BASIC_REGEX,
EMAIL_BASIC_REGEX,
NAME_EMAIL_SIMPLE_REGEX,
NAME_EMAIL_BASIC_REGEX,
DOMAIN_WITH_LOCALHOST_REGEX,
DOMAIN_WITH_LOCALHOST_PORT_REGEX,
DOMAIN_REGEX,
DOMAIN_REGEX
)
from corelibs.check_handling.regex_constants_compiled import (
COMPILED_EMAIL_BASIC_REGEX,
COMPILED_NAME_EMAIL_SIMPLE_REGEX,
COMPILED_NAME_EMAIL_BASIC_REGEX,
COMPILED_DOMAIN_WITH_LOCALHOST_REGEX,
COMPILED_DOMAIN_WITH_LOCALHOST_PORT_REGEX,
COMPILED_DOMAIN_REGEX,
)
@@ -48,7 +59,7 @@ class TestEmailBasicRegex:
@pytest.fixture
def email_pattern(self) -> re.Pattern[str]:
"""Fixture that returns compiled email regex pattern."""
return compile_re(EMAIL_BASIC_REGEX)
return COMPILED_EMAIL_BASIC_REGEX
@pytest.mark.parametrize("valid_email", [
"user@example.com",
@@ -123,13 +134,272 @@ class TestEmailBasicRegex:
assert not email_pattern.match(email)
class TestSubEmailBasicRegex:
"""Test cases for SUB_EMAIL_BASIC_REGEX pattern (without anchors)."""
@pytest.fixture
def sub_email_pattern(self) -> re.Pattern[str]:
"""Fixture that returns compiled sub email regex pattern."""
return compile_re(rf"^{SUB_EMAIL_BASIC_REGEX}$")
@pytest.mark.parametrize("valid_email", [
"user@example.com",
"test.user@example.com",
"user+tag@example.co.uk",
"first.last@subdomain.example.com",
"user123@test-domain.com",
"a@example.com",
"user_name@example.com",
"user-name@example.com",
"user@sub.domain.example.com",
"test!#$%&'*+-/=?^_`{|}~@example.com",
"1234567890@example.com",
])
def test_valid_emails_match(self, sub_email_pattern: re.Pattern[str], valid_email: str) -> None:
"""Test that valid email addresses match SUB_EMAIL_BASIC_REGEX."""
assert sub_email_pattern.match(valid_email), (
f"Failed to match valid email: {valid_email}"
)
@pytest.mark.parametrize("invalid_email", [
"",
"@example.com",
"user@",
"user",
"user@.com",
"user@domain",
"user @example.com",
".user@example.com",
"user@-example.com",
"user@example-.com",
"user@example.c",
"user@example.toolong",
])
def test_invalid_emails_no_match(self, sub_email_pattern: re.Pattern[str], invalid_email: str) -> None:
"""Test that invalid emails don't match SUB_EMAIL_BASIC_REGEX."""
assert not sub_email_pattern.match(invalid_email), (
f"Incorrectly matched invalid email: {invalid_email}"
)
def test_sub_email_max_local_part_length(self, sub_email_pattern: re.Pattern[str]) -> None:
"""Test email with maximum local part length (64 characters)."""
local_part = "a" * 64
email = f"{local_part}@example.com"
assert sub_email_pattern.match(email)
def test_sub_email_exceeds_local_part_length(self, sub_email_pattern: re.Pattern[str]) -> None:
"""Test email exceeding maximum local part length."""
local_part = "a" * 65
email = f"{local_part}@example.com"
assert not sub_email_pattern.match(email)
class TestNameEmailSimpleRegex:
"""Test cases for NAME_EMAIL_SIMPLE_REGEX pattern."""
@pytest.fixture
def name_email_simple_pattern(self) -> re.Pattern[str]:
"""Fixture that returns compiled name+email simple regex pattern."""
return COMPILED_NAME_EMAIL_SIMPLE_REGEX
@pytest.mark.parametrize("test_input,expected_groups", [
('"John Doe" <john@example.com>', {'name1': 'John Doe', 'email1': 'john@example.com'}),
('John Doe <john@example.com>', {'name2': 'John Doe', 'email2': 'john@example.com'}),
('<john@example.com>', {'email3': 'john@example.com'}),
('john@example.com', {'email4': 'john@example.com'}),
(' "Jane Smith" <jane@test.com> ', {'name1': 'Jane Smith', 'email1': 'jane@test.com'}),
('Bob <bob@test.org>', {'name2': 'Bob', 'email2': 'bob@test.org'}),
])
def test_valid_name_email_combinations(
self, name_email_simple_pattern: re.Pattern[str], test_input: str, expected_groups: dict[str, str]
) -> None:
"""Test that valid name+email combinations match and extract correct groups."""
match = name_email_simple_pattern.match(test_input)
assert match is not None, f"Failed to match: {test_input}"
# Check that expected groups are present and match
for group_name, expected_value in expected_groups.items():
assert match.group(group_name) == expected_value, (
f"Group {group_name} expected '{expected_value}', got '{match.group(group_name)}'"
)
@pytest.mark.parametrize("invalid_input", [
"",
"not an email",
"<>",
'"Name Only"',
'Name <',
'<email',
'Name <<email@test.com>>',
'Name <email@test.com',
'Name email@test.com>',
])
def test_invalid_name_email_combinations(
self, name_email_simple_pattern: re.Pattern[str], invalid_input: str
) -> None:
"""Test that invalid inputs don't match NAME_EMAIL_SIMPLE_REGEX."""
assert not name_email_simple_pattern.match(invalid_input), (
f"Incorrectly matched invalid input: {invalid_input}"
)
def test_extract_name_from_quoted(
self, name_email_simple_pattern: re.Pattern[str]
) -> None:
"""Test extracting name from quoted format."""
match = name_email_simple_pattern.match('"Alice Wonder" <alice@example.com>')
assert match is not None
assert match.group('name1') == 'Alice Wonder'
assert match.group('email1') == 'alice@example.com'
def test_extract_name_from_unquoted(
self, name_email_simple_pattern: re.Pattern[str]
) -> None:
"""Test extracting name from unquoted format."""
match = name_email_simple_pattern.match('Bob Builder <bob@example.com>')
assert match is not None
assert match.group('name2') == 'Bob Builder'
assert match.group('email2') == 'bob@example.com'
def test_email_only_in_brackets(
self, name_email_simple_pattern: re.Pattern[str]
) -> None:
"""Test email-only format in angle brackets."""
match = name_email_simple_pattern.match('<charlie@example.com>')
assert match is not None
assert match.group('email3') == 'charlie@example.com'
def test_email_only_plain(
self, name_email_simple_pattern: re.Pattern[str]
) -> None:
"""Test plain email format without brackets."""
match = name_email_simple_pattern.match('dave@example.com')
assert match is not None
assert match.group('email4') == 'dave@example.com'
def test_whitespace_handling(
self, name_email_simple_pattern: re.Pattern[str]
) -> None:
"""Test that leading/trailing whitespace is handled correctly."""
match = name_email_simple_pattern.match(' "User Name" <user@example.com> ')
assert match is not None
assert match.group('name1') == 'User Name'
assert match.group('email1') == 'user@example.com'
class TestNameEmailBasicRegex:
"""Test cases for NAME_EMAIL_BASIC_REGEX pattern with strict email validation."""
@pytest.fixture
def name_email_basic_pattern(self) -> re.Pattern[str]:
"""Fixture that returns compiled name+email basic regex pattern."""
return COMPILED_NAME_EMAIL_BASIC_REGEX
@pytest.mark.parametrize("test_input,expected_name,expected_email", [
('"John Doe" <john@example.com>', 'John Doe', 'john@example.com'),
('John Doe <john@example.com>', 'John Doe', 'john@example.com'),
('<john@example.com>', None, 'john@example.com'),
('john@example.com', None, 'john@example.com'),
(' "Jane Smith" <jane.smith@test.co.uk> ', 'Jane Smith', 'jane.smith@test.co.uk'),
('Alice Wonder <alice+tag@example.com>', 'Alice Wonder', 'alice+tag@example.com'),
])
def test_valid_name_email_with_validation(
self,
name_email_basic_pattern: re.Pattern[str],
test_input: str,
expected_name: str | None,
expected_email: str,
) -> None:
"""Test valid name+email with strict email validation."""
match = name_email_basic_pattern.match(test_input)
assert match is not None, f"Failed to match: {test_input}"
# Extract name and email from whichever group matched
name = match.group('name1') or match.group('name2')
email = (
match.group('email1') or match.group('email2') or
match.group('email3') or match.group('email4')
)
assert name == expected_name, f"Expected name '{expected_name}', got '{name}'"
assert email == expected_email, f"Expected email '{expected_email}', got '{email}'"
@pytest.mark.parametrize("invalid_input", [
'"John Doe" <invalid.email>', # invalid email format
'John Doe <@example.com>', # missing local part
'<user@>', # missing domain
'user@domain', # no TLD
'"Name" <user @example.com>', # space in email
'<.user@example.com>', # starts with dot
'user@-example.com', # domain starts with hyphen
'Name <user@example.c>', # TLD too short
'Name <user@example.toolongdomain>', # TLD too long
])
def test_invalid_email_format_rejected(
self, name_email_basic_pattern: re.Pattern[str], invalid_input: str
) -> None:
"""Test that inputs with invalid email formats are rejected."""
assert not name_email_basic_pattern.match(invalid_input), (
f"Incorrectly matched invalid input: {invalid_input}"
)
def test_quoted_name_with_valid_email(
self, name_email_basic_pattern: re.Pattern[str]
) -> None:
"""Test quoted name format with valid email."""
match = name_email_basic_pattern.match('"Alice Wonder" <alice@example.com>')
assert match is not None
assert match.group('name1') == 'Alice Wonder'
assert match.group('email1') == 'alice@example.com'
def test_unquoted_name_with_valid_email(
self, name_email_basic_pattern: re.Pattern[str]
) -> None:
"""Test unquoted name format with valid email."""
match = name_email_basic_pattern.match('Bob Builder <bob@example.com>')
assert match is not None
assert match.group('name2') == 'Bob Builder'
assert match.group('email2') == 'bob@example.com'
def test_email_only_formats(
self, name_email_basic_pattern: re.Pattern[str]
) -> None:
"""Test email-only formats (with and without brackets)."""
# With brackets
match1 = name_email_basic_pattern.match('<charlie@example.com>')
assert match1 is not None
assert match1.group('email3') == 'charlie@example.com'
# Without brackets
match2 = name_email_basic_pattern.match('dave@example.com')
assert match2 is not None
assert match2.group('email4') == 'dave@example.com'
def test_whitespace_handling(
self, name_email_basic_pattern: re.Pattern[str]
) -> None:
"""Test that leading/trailing whitespace is handled correctly."""
match = name_email_basic_pattern.match(' "User" <user@example.com> ')
assert match is not None
assert match.group('name1') == 'User'
assert match.group('email1') == 'user@example.com'
def test_special_characters_in_local_part(
self, name_email_basic_pattern: re.Pattern[str]
) -> None:
"""Test email with special characters in local part."""
match = name_email_basic_pattern.match('Test User <test!#$%&\'*+-/=?^_`{|}~@example.com>')
assert match is not None
assert match.group('name2') == 'Test User'
assert match.group('email2') == 'test!#$%&\'*+-/=?^_`{|}~@example.com'
class TestDomainWithLocalhostRegex:
"""Test cases for DOMAIN_WITH_LOCALHOST_REGEX pattern."""
@pytest.fixture
def domain_localhost_pattern(self) -> re.Pattern[str]:
"""Fixture that returns compiled domain with localhost regex pattern."""
return compile_re(DOMAIN_WITH_LOCALHOST_REGEX)
return COMPILED_DOMAIN_WITH_LOCALHOST_REGEX
@pytest.mark.parametrize("valid_domain", [
"localhost",
@@ -181,7 +451,7 @@ class TestDomainWithLocalhostPortRegex:
@pytest.fixture
def domain_localhost_port_pattern(self) -> re.Pattern[str]:
"""Fixture that returns compiled domain and localhost with port pattern."""
return compile_re(DOMAIN_WITH_LOCALHOST_PORT_REGEX)
return COMPILED_DOMAIN_WITH_LOCALHOST_PORT_REGEX
@pytest.mark.parametrize("valid_domain", [
"localhost",
@@ -247,7 +517,7 @@ class TestDomainRegex:
@pytest.fixture
def domain_pattern(self) -> re.Pattern[str]:
"""Fixture that returns compiled domain regex pattern."""
return compile_re(DOMAIN_REGEX)
return COMPILED_DOMAIN_REGEX
@pytest.mark.parametrize("valid_domain", [
"example.com",
@@ -306,6 +576,8 @@ class TestRegexPatternConsistency:
"""Test that all regex patterns can be compiled without errors."""
patterns = [
EMAIL_BASIC_REGEX,
NAME_EMAIL_SIMPLE_REGEX,
NAME_EMAIL_BASIC_REGEX,
DOMAIN_WITH_LOCALHOST_REGEX,
DOMAIN_WITH_LOCALHOST_PORT_REGEX,
DOMAIN_REGEX,
@@ -314,9 +586,24 @@ class TestRegexPatternConsistency:
compiled = compile_re(pattern)
assert isinstance(compiled, re.Pattern)
def test_compiled_patterns_are_patterns(self) -> None:
"""Test that all COMPILED_ constants are Pattern objects."""
compiled_patterns = [
COMPILED_EMAIL_BASIC_REGEX,
COMPILED_NAME_EMAIL_SIMPLE_REGEX,
COMPILED_NAME_EMAIL_BASIC_REGEX,
COMPILED_DOMAIN_WITH_LOCALHOST_REGEX,
COMPILED_DOMAIN_WITH_LOCALHOST_PORT_REGEX,
COMPILED_DOMAIN_REGEX,
]
for pattern in compiled_patterns:
assert isinstance(pattern, re.Pattern)
def test_domain_patterns_are_strings(self) -> None:
"""Test that all regex constants are strings."""
assert isinstance(EMAIL_BASIC_REGEX, str)
assert isinstance(NAME_EMAIL_SIMPLE_REGEX, str)
assert isinstance(NAME_EMAIL_BASIC_REGEX, str)
assert isinstance(DOMAIN_WITH_LOCALHOST_REGEX, str)
assert isinstance(DOMAIN_WITH_LOCALHOST_PORT_REGEX, str)
assert isinstance(DOMAIN_REGEX, str)
@@ -325,8 +612,8 @@ class TestRegexPatternConsistency:
"""Test that domain patterns follow expected hierarchy."""
# DOMAIN_WITH_LOCALHOST_PORT_REGEX should accept everything
# DOMAIN_WITH_LOCALHOST_REGEX accepts
domain_localhost = compile_re(DOMAIN_WITH_LOCALHOST_REGEX)
domain_localhost_port = compile_re(DOMAIN_WITH_LOCALHOST_PORT_REGEX)
domain_localhost = COMPILED_DOMAIN_WITH_LOCALHOST_REGEX
domain_localhost_port = COMPILED_DOMAIN_WITH_LOCALHOST_PORT_REGEX
test_cases = ["example.com", "subdomain.example.com", "localhost"]
for test_case in test_cases:

View File

@@ -16,7 +16,7 @@ class TestSettingsLoaderInit:
def test_init_with_valid_config_file(self, tmp_path: Path):
"""Test initialization with a valid config file"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[Section]\nkey=value\n")
loader = SettingsLoader(
@@ -35,7 +35,7 @@ class TestSettingsLoaderInit:
def test_init_with_missing_config_file(self, tmp_path: Path):
"""Test initialization with missing config file"""
config_file = tmp_path / "missing.ini"
config_file = tmp_path.joinpath("missing.ini")
loader = SettingsLoader(
args={},
@@ -60,7 +60,7 @@ class TestSettingsLoaderInit:
def test_init_with_log(self, tmp_path: Path):
"""Test initialization with Log object"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[Section]\nkey=value\n")
mock_log = Mock(spec=Log)
@@ -80,7 +80,7 @@ class TestLoadSettings:
def test_load_settings_basic(self, tmp_path: Path):
"""Test loading basic settings without validation"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nkey1=value1\nkey2=value2\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -90,7 +90,7 @@ class TestLoadSettings:
def test_load_settings_with_missing_section(self, tmp_path: Path):
"""Test loading settings with missing section"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[OtherSection]\nkey=value\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -100,7 +100,7 @@ class TestLoadSettings:
def test_load_settings_allow_not_exist(self, tmp_path: Path):
"""Test loading settings with allow_not_exist flag"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[OtherSection]\nkey=value\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -110,7 +110,7 @@ class TestLoadSettings:
def test_load_settings_mandatory_field_present(self, tmp_path: Path):
"""Test mandatory field validation when field is present"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nrequired_field=value\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -123,7 +123,7 @@ class TestLoadSettings:
def test_load_settings_mandatory_field_missing(self, tmp_path: Path):
"""Test mandatory field validation when field is missing"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nother_field=value\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -136,7 +136,7 @@ class TestLoadSettings:
def test_load_settings_mandatory_field_empty(self, tmp_path: Path):
"""Test mandatory field validation when field is empty"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nrequired_field=\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -149,7 +149,7 @@ class TestLoadSettings:
def test_load_settings_with_split(self, tmp_path: Path):
"""Test splitting values into lists"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nlist_field=a,b,c,d\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -162,7 +162,7 @@ class TestLoadSettings:
def test_load_settings_with_custom_split_char(self, tmp_path: Path):
"""Test splitting with custom delimiter"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nlist_field=a|b|c|d\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -175,7 +175,7 @@ class TestLoadSettings:
def test_load_settings_split_removes_spaces(self, tmp_path: Path):
"""Test that split removes spaces from values"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nlist_field=a, b , c , d\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -188,7 +188,7 @@ class TestLoadSettings:
def test_load_settings_empty_split_char_fallback(self, tmp_path: Path, capsys: CaptureFixture[str]):
"""Test fallback to default split char when empty"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nlist_field=a,b,c\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -201,9 +201,22 @@ class TestLoadSettings:
captured = capsys.readouterr()
assert "fallback to:" in captured.out
def test_load_settings_split_empty_value(self, tmp_path: Path):
"""Test that split on empty value results in empty list"""
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nlist_field=\n")
loader = SettingsLoader(args={}, config_file=config_file)
result = loader.load_settings(
"TestSection",
{"list_field": ["split:,"]}
)
assert result["list_field"] == []
def test_load_settings_convert_to_int(self, tmp_path: Path):
"""Test converting values to int"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nnumber=123\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -217,7 +230,7 @@ class TestLoadSettings:
def test_load_settings_convert_to_float(self, tmp_path: Path):
"""Test converting values to float"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nnumber=123.45\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -231,7 +244,7 @@ class TestLoadSettings:
def test_load_settings_convert_to_bool_true(self, tmp_path: Path):
"""Test converting values to boolean True"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nflag1=true\nflag2=True\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -245,7 +258,7 @@ class TestLoadSettings:
def test_load_settings_convert_to_bool_false(self, tmp_path: Path):
"""Test converting values to boolean False"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nflag1=false\nflag2=False\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -259,7 +272,7 @@ class TestLoadSettings:
def test_load_settings_convert_invalid_type(self, tmp_path: Path):
"""Test converting with invalid type raises error"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=test\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -272,7 +285,7 @@ class TestLoadSettings:
def test_load_settings_empty_set_to_none(self, tmp_path: Path):
"""Test setting empty values to None"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nother=value\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -285,7 +298,7 @@ class TestLoadSettings:
def test_load_settings_empty_set_to_custom_value(self, tmp_path: Path):
"""Test setting empty values to custom value"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nother=value\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -298,7 +311,7 @@ class TestLoadSettings:
def test_load_settings_matching_valid(self, tmp_path: Path):
"""Test matching validation with valid value"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nmode=production\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -311,7 +324,7 @@ class TestLoadSettings:
def test_load_settings_matching_invalid(self, tmp_path: Path):
"""Test matching validation with invalid value"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nmode=invalid\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -324,7 +337,7 @@ class TestLoadSettings:
def test_load_settings_in_valid(self, tmp_path: Path):
"""Test 'in' validation with valid value"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nallowed=a,b,c\nvalue=b\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -340,7 +353,7 @@ class TestLoadSettings:
def test_load_settings_in_invalid(self, tmp_path: Path):
"""Test 'in' validation with invalid value"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nallowed=a,b,c\nvalue=d\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -356,7 +369,7 @@ class TestLoadSettings:
def test_load_settings_in_missing_target(self, tmp_path: Path):
"""Test 'in' validation with missing target"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=a\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -369,7 +382,7 @@ class TestLoadSettings:
def test_load_settings_length_exact(self, tmp_path: Path):
"""Test length validation with exact match"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=test\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -382,7 +395,7 @@ class TestLoadSettings:
def test_load_settings_length_exact_invalid(self, tmp_path: Path):
"""Test length validation with exact match failure"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=test\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -395,7 +408,7 @@ class TestLoadSettings:
def test_load_settings_length_range(self, tmp_path: Path):
"""Test length validation with range"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=testing\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -408,7 +421,7 @@ class TestLoadSettings:
def test_load_settings_length_min_only(self, tmp_path: Path):
"""Test length validation with minimum only"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=testing\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -421,7 +434,7 @@ class TestLoadSettings:
def test_load_settings_length_max_only(self, tmp_path: Path):
"""Test length validation with maximum only"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=test\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -434,7 +447,7 @@ class TestLoadSettings:
def test_load_settings_range_valid(self, tmp_path: Path):
"""Test range validation with valid value"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nnumber=25\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -447,7 +460,7 @@ class TestLoadSettings:
def test_load_settings_range_invalid(self, tmp_path: Path):
"""Test range validation with invalid value"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nnumber=100\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -460,7 +473,7 @@ class TestLoadSettings:
def test_load_settings_check_int_valid(self, tmp_path: Path):
"""Test check:int with valid integer"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nnumber=12345\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -473,7 +486,7 @@ class TestLoadSettings:
def test_load_settings_check_int_cleanup(self, tmp_path: Path):
"""Test check:int with cleanup"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nnumber=12a34b5\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -486,7 +499,7 @@ class TestLoadSettings:
def test_load_settings_check_email_valid(self, tmp_path: Path):
"""Test check:string.email.basic with valid email"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nemail=test@example.com\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -499,7 +512,7 @@ class TestLoadSettings:
def test_load_settings_check_email_invalid(self, tmp_path: Path):
"""Test check:string.email.basic with invalid email"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nemail=not-an-email\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -512,7 +525,7 @@ class TestLoadSettings:
def test_load_settings_args_override(self, tmp_path: Path, capsys: CaptureFixture[str]):
"""Test command line arguments override config values"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=config_value\n")
loader = SettingsLoader(
@@ -530,7 +543,7 @@ class TestLoadSettings:
def test_load_settings_args_no_flag(self, tmp_path: Path, capsys: CaptureFixture[str]):
"""Test default behavior (no args_override:yes) with list argument that has split"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=a,b,c\n")
loader = SettingsLoader(
@@ -550,7 +563,7 @@ class TestLoadSettings:
def test_load_settings_args_list_no_split(self, tmp_path: Path, capsys: CaptureFixture[str]):
"""Test that list arguments without split entry are skipped"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=config_value\n")
loader = SettingsLoader(
@@ -570,7 +583,7 @@ class TestLoadSettings:
def test_load_settings_args_list_with_split(self, tmp_path: Path, capsys: CaptureFixture[str]):
"""Test that list arguments with split entry and args_override:yes are applied"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=a,b,c\n")
loader = SettingsLoader(
@@ -589,7 +602,7 @@ class TestLoadSettings:
def test_load_settings_args_no_with_mandatory(self, tmp_path: Path, capsys: CaptureFixture[str]):
"""Test default behavior (no args_override:yes) with mandatory field and list args with split"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=config1,config2\n")
loader = SettingsLoader(
@@ -609,7 +622,7 @@ class TestLoadSettings:
def test_load_settings_args_no_with_mandatory_valid(self, tmp_path: Path, capsys: CaptureFixture[str]):
"""Test default behavior with string args (always overrides due to current logic)"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=config_value\n")
loader = SettingsLoader(
@@ -628,7 +641,7 @@ class TestLoadSettings:
def test_load_settings_args_string_no_split(self, tmp_path: Path, capsys: CaptureFixture[str]):
"""Test that string arguments with args_override:yes work normally"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=config_value\n")
loader = SettingsLoader(
@@ -647,7 +660,7 @@ class TestLoadSettings:
def test_load_settings_no_config_file_with_args(self, tmp_path: Path):
"""Test loading settings without config file but with mandatory args"""
config_file = tmp_path / "missing.ini"
config_file = tmp_path.joinpath("missing.ini")
loader = SettingsLoader(
args={"required": "value"},
@@ -662,7 +675,7 @@ class TestLoadSettings:
def test_load_settings_no_config_file_missing_args(self, tmp_path: Path):
"""Test loading settings without config file and missing args"""
config_file = tmp_path / "missing.ini"
config_file = tmp_path.joinpath("missing.ini")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -674,7 +687,7 @@ class TestLoadSettings:
def test_load_settings_check_list_with_split(self, tmp_path: Path):
"""Test check validation with list values"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nlist=abc,def,ghi\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -687,7 +700,7 @@ class TestLoadSettings:
def test_load_settings_check_list_cleanup(self, tmp_path: Path):
"""Test check validation cleans up list values"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nlist=ab-c,de_f,gh!i\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -700,7 +713,7 @@ class TestLoadSettings:
def test_load_settings_invalid_check_type(self, tmp_path: Path):
"""Test with invalid check type"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text("[TestSection]\nvalue=test\n")
loader = SettingsLoader(args={}, config_file=config_file)
@@ -717,7 +730,7 @@ class TestComplexScenarios:
def test_complex_validation_scenario(self, tmp_path: Path):
"""Test complex scenario with multiple validations"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text(
"[Production]\n"
"environment=production\n"
@@ -758,7 +771,7 @@ class TestComplexScenarios:
def test_email_list_validation(self, tmp_path: Path):
"""Test email list with validation"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text(
"[EmailConfig]\n"
"emails=test@example.com,admin@domain.org,user+tag@site.co.uk\n"
@@ -775,7 +788,7 @@ class TestComplexScenarios:
def test_mixed_args_and_config(self, tmp_path: Path):
"""Test mixing command line args and config file"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text(
"[Settings]\n"
"value1=config_value1\n"
@@ -796,7 +809,7 @@ class TestComplexScenarios:
def test_multiple_check_types(self, tmp_path: Path):
"""Test multiple different check types"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text(
"[Checks]\n"
"numbers=123,456,789\n"
@@ -823,7 +836,7 @@ class TestComplexScenarios:
def test_args_no_and_list_skip_combination(self, tmp_path: Path, capsys: CaptureFixture[str]):
"""Test combination of args_override:yes flag and list argument skip behavior"""
config_file = tmp_path / "test.ini"
config_file = tmp_path.joinpath("test.ini")
config_file.write_text(
"[Settings]\n"
"no_override=a,b,c\n"

View File

@@ -4,7 +4,101 @@ tests for corelibs.iterator_handling.fingerprint
from typing import Any
import pytest
from corelibs.iterator_handling.fingerprint import dict_hash_frozen, dict_hash_crc
from corelibs.iterator_handling.fingerprint import dict_hash_frozen, dict_hash_crc, hash_object
class TestHashObject:
"""Tests for hash_object function"""
def test_hash_object_simple_dict(self):
"""Test hashing a simple dictionary with hash_object"""
data = {"key1": "value1", "key2": "value2"}
result = hash_object(data)
assert isinstance(result, str)
assert len(result) == 64 # SHA256 produces 64 hex characters
def test_hash_object_mixed_keys(self):
"""Test hash_object with mixed int and string keys"""
data = {"key1": "value1", 1: "value2", 2: "value3"}
result = hash_object(data)
assert isinstance(result, str)
assert len(result) == 64
def test_hash_object_consistency(self):
"""Test that hash_object produces consistent results"""
data = {"str_key": "value", 123: "number_key"}
hash1 = hash_object(data)
hash2 = hash_object(data)
assert hash1 == hash2
def test_hash_object_order_independence(self):
"""Test that hash_object is order-independent"""
data1 = {"a": 1, 1: "one", "b": 2, 2: "two"}
data2 = {2: "two", "b": 2, 1: "one", "a": 1}
hash1 = hash_object(data1)
hash2 = hash_object(data2)
assert hash1 == hash2
def test_hash_object_list_of_dicts_mixed_keys(self):
"""Test hash_object with list of dicts containing mixed keys"""
data = [
{"name": "item1", 1: "value1"},
{"name": "item2", 2: "value2"}
]
result = hash_object(data)
assert isinstance(result, str)
assert len(result) == 64
def test_hash_object_nested_mixed_keys(self):
"""Test hash_object with nested structures containing mixed keys"""
data = {
"outer": {
"inner": "value",
1: "mixed_key"
},
2: "another_mixed"
}
result = hash_object(data)
assert isinstance(result, str)
assert len(result) == 64
def test_hash_object_different_data(self):
"""Test that different data produces different hashes"""
data1 = {"key": "value", 1: "one"}
data2 = {"key": "value", 2: "two"}
hash1 = hash_object(data1)
hash2 = hash_object(data2)
assert hash1 != hash2
def test_hash_object_complex_nested(self):
"""Test hash_object with complex nested structures"""
data = {
"level1": {
"level2": {
1: "value",
"key": [1, 2, {"nested": "deep", 3: "int_key"}]
}
}
}
result = hash_object(data)
assert isinstance(result, str)
assert len(result) == 64
def test_hash_object_list_with_tuples(self):
"""Test hash_object with lists containing tuples"""
data = [("a", 1), ("b", 2), {1: "mixed", "key": "value"}]
result = hash_object(data)
assert isinstance(result, str)
assert len(result) == 64
class TestDictHashFrozen:
@@ -279,6 +373,116 @@ class TestDictHashCrc:
assert isinstance(result, str)
assert len(result) == 64
def test_dict_hash_crc_fallback_mixed_keys(self):
"""Test dict_hash_crc fallback with mixed int and string keys"""
data = {"key1": "value1", 1: "value2", 2: "value3"}
result = dict_hash_crc(data)
assert isinstance(result, str)
# Fallback prefixes with "HO_"
assert result.startswith("HO_")
# Hash should be 64 chars + 3 char prefix = 67 total
assert len(result) == 67
def test_dict_hash_crc_fallback_consistency(self):
"""Test that fallback produces consistent hashes"""
data = {"str_key": "value", 123: "number_key", 456: "another"}
hash1 = dict_hash_crc(data)
hash2 = dict_hash_crc(data)
assert hash1 == hash2
assert hash1.startswith("HO_")
def test_dict_hash_crc_fallback_order_independence(self):
"""Test that fallback is order-independent for mixed-key dicts"""
data1 = {"a": 1, 1: "one", "b": 2, 2: "two"}
data2 = {2: "two", "b": 2, 1: "one", "a": 1}
hash1 = dict_hash_crc(data1)
hash2 = dict_hash_crc(data2)
assert hash1 == hash2
assert hash1.startswith("HO_")
def test_dict_hash_crc_fallback_list_of_dicts_mixed_keys(self):
"""Test fallback with list of dicts containing mixed keys"""
data = [
{"name": "item1", 1: "value1"},
{"name": "item2", 2: "value2"},
{3: "value3", "type": "mixed"}
]
result = dict_hash_crc(data)
assert isinstance(result, str)
assert result.startswith("HO_")
assert len(result) == 67
def test_dict_hash_crc_fallback_nested_mixed_keys(self):
"""Test fallback with nested dicts containing mixed keys"""
data = {
"outer": {
"inner": "value",
1: "mixed_key"
},
2: "another_mixed"
}
result = dict_hash_crc(data)
assert isinstance(result, str)
assert result.startswith("HO_")
assert len(result) == 67
def test_dict_hash_crc_fallback_different_data(self):
"""Test that different mixed-key data produces different hashes"""
data1 = {"key": "value", 1: "one"}
data2 = {"key": "value", 2: "two"}
hash1 = dict_hash_crc(data1)
hash2 = dict_hash_crc(data2)
assert hash1 != hash2
assert hash1.startswith("HO_")
assert hash2.startswith("HO_")
def test_dict_hash_crc_fallback_complex_structure(self):
"""Test fallback with complex nested structure with mixed keys"""
data = [
{
"id": 1,
1: "first",
"data": {
"nested": "value",
100: "nested_int_key"
}
},
{
"id": 2,
2: "second",
"items": [1, 2, 3]
}
]
result = dict_hash_crc(data)
assert isinstance(result, str)
assert result.startswith("HO_")
assert len(result) == 67
def test_dict_hash_crc_no_fallback_string_keys_only(self):
"""Test that string-only keys don't trigger fallback"""
data = {"key1": "value1", "key2": "value2", "key3": "value3"}
result = dict_hash_crc(data)
assert isinstance(result, str)
assert not result.startswith("HO_")
assert len(result) == 64
def test_dict_hash_crc_no_fallback_int_keys_only(self):
"""Test that int-only keys don't trigger fallback"""
data = {1: "one", 2: "two", 3: "three"}
result = dict_hash_crc(data)
assert isinstance(result, str)
assert not result.startswith("HO_")
assert len(result) == 64
class TestComparisonBetweenHashFunctions:
"""Tests comparing dict_hash_frozen and dict_hash_crc"""

View File

@@ -4,7 +4,7 @@ iterator_handling.list_helepr tests
from typing import Any
import pytest
from corelibs.iterator_handling.list_helpers import convert_to_list, is_list_in_list
from corelibs.iterator_handling.list_helpers import convert_to_list, is_list_in_list, make_unique_list_of_dicts
class TestConvertToList:
@@ -298,3 +298,225 @@ class TestPerformance:
# Should still work correctly despite duplicates
assert set(result) == {1, 3}
assert isinstance(result, list)
class TestMakeUniqueListOfDicts:
"""Test cases for make_unique_list_of_dicts function"""
def test_basic_duplicate_removal(self):
"""Test basic removal of duplicate dictionaries"""
dict_list = [
{"a": 1, "b": 2},
{"a": 1, "b": 2},
{"a": 3, "b": 4}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 2
assert {"a": 1, "b": 2} in result
assert {"a": 3, "b": 4} in result
def test_order_independent_duplicates(self):
"""Test that dictionaries with different key orders are treated as duplicates"""
dict_list = [
{"a": 1, "b": 2},
{"b": 2, "a": 1}, # Same content, different order
{"a": 3, "b": 4}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 2
assert {"a": 1, "b": 2} in result
assert {"a": 3, "b": 4} in result
def test_empty_list(self):
"""Test with empty list"""
result = make_unique_list_of_dicts([])
assert result == []
assert isinstance(result, list)
def test_single_dict(self):
"""Test with single dictionary"""
dict_list = [{"a": 1, "b": 2}]
result = make_unique_list_of_dicts(dict_list)
assert result == [{"a": 1, "b": 2}]
def test_all_unique(self):
"""Test when all dictionaries are unique"""
dict_list = [
{"a": 1},
{"b": 2},
{"c": 3},
{"d": 4}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 4
for d in dict_list:
assert d in result
def test_all_duplicates(self):
"""Test when all dictionaries are duplicates"""
dict_list = [
{"a": 1, "b": 2},
{"a": 1, "b": 2},
{"a": 1, "b": 2},
{"b": 2, "a": 1}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 1
assert result[0] == {"a": 1, "b": 2}
def test_nested_values(self):
"""Test with nested structures as values"""
dict_list = [
{"a": [1, 2], "b": 3},
{"a": [1, 2], "b": 3},
{"a": [1, 3], "b": 3}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 2
assert {"a": [1, 2], "b": 3} in result
assert {"a": [1, 3], "b": 3} in result
def test_different_value_types(self):
"""Test with different value types"""
dict_list = [
{"str": "hello", "int": 42, "float": 3.14, "bool": True},
{"str": "hello", "int": 42, "float": 3.14, "bool": True},
{"str": "world", "int": 99, "float": 2.71, "bool": False}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 2
def test_empty_dicts(self):
"""Test with empty dictionaries"""
dict_list: list[Any] = [
{},
{},
{"a": 1}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 2
assert {} in result
assert {"a": 1} in result
def test_single_key_dicts(self):
"""Test with single key dictionaries"""
dict_list = [
{"a": 1},
{"a": 1},
{"a": 2},
{"b": 1}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 3
assert {"a": 1} in result
assert {"a": 2} in result
assert {"b": 1} in result
def test_many_keys(self):
"""Test with dictionaries containing many keys"""
dict1 = {"a": 1, "b": 2, "c": 3, "d": 4, "e": 5}
dict2 = {"e": 5, "d": 4, "c": 3, "b": 2, "a": 1} # Same, different order
dict3 = {"a": 1, "b": 2, "c": 3, "d": 4, "e": 6} # Different value
dict_list = [dict1, dict2, dict3]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 2
def test_numeric_keys(self):
"""Test with numeric keys"""
dict_list = [
{1: "one", 2: "two"},
{2: "two", 1: "one"},
{1: "one", 2: "three"}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 2
def test_none_values(self):
"""Test with None values"""
dict_list = [
{"a": None, "b": 2},
{"a": None, "b": 2},
{"a": 1, "b": None}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 2
assert {"a": None, "b": 2} in result
assert {"a": 1, "b": None} in result
def test_mixed_key_types(self):
"""Test with mixed key types (string and numeric)"""
dict_list = [
{"a": 1, 1: "one"},
{1: "one", "a": 1},
{"a": 2, 1: "one"}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 2
@pytest.mark.parametrize("dict_list,expected_length", [
([{"a": 1}, {"a": 1}, {"a": 1}], 1),
([{"a": 1}, {"a": 2}, {"a": 3}], 3),
([{"a": 1, "b": 2}, {"b": 2, "a": 1}], 1),
([{}, {}], 1),
([{"x": [1, 2]}, {"x": [1, 2]}], 1),
([{"a": 1}, {"b": 2}, {"c": 3}], 3),
]) # pyright: ignore[reportUnknownArgumentType]
def test_parametrized_unique_dicts(self, dict_list: list[Any], expected_length: int):
"""Test make_unique_list_of_dicts with various input combinations"""
result = make_unique_list_of_dicts(dict_list)
assert len(result) == expected_length
assert isinstance(result, list)
def test_large_list(self):
"""Test with a large list of dictionaries"""
dict_list = [{"id": i % 100, "value": f"val_{i % 100}"} for i in range(1000)]
result = make_unique_list_of_dicts(dict_list)
# Should have 100 unique dicts (0-99)
assert len(result) == 100
def test_preserves_last_occurrence(self):
"""Test behavior with duplicate entries"""
# The function uses dict comprehension, which keeps last occurrence
dict_list = [
{"a": 1, "b": 2},
{"a": 3, "b": 4},
{"a": 1, "b": 2}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 2
# Just verify correct unique count, order may vary
def test_nested_dicts(self):
"""Test with nested dictionaries"""
dict_list = [
{"outer": {"inner": 1}},
{"outer": {"inner": 1}},
{"outer": {"inner": 2}}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 2
def test_string_values_case_sensitive(self):
"""Test that string values are case-sensitive"""
dict_list = [
{"name": "John"},
{"name": "john"},
{"name": "JOHN"},
{"name": "John"}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 3
def test_boolean_values(self):
"""Test with boolean values"""
dict_list = [
{"flag": True, "count": 1},
{"count": 1, "flag": True},
{"flag": False, "count": 1}
]
result = make_unique_list_of_dicts(dict_list)
assert len(result) == 2
assert {"flag": True, "count": 1} in result
assert {"flag": False, "count": 1} in result
# __END__

View File

@@ -28,6 +28,7 @@ def tmp_log_path(tmp_path: Path) -> Path:
@pytest.fixture
def basic_log_settings() -> LogSettings:
"""Basic log settings for testing"""
# Return a new dict each time to avoid state pollution
return {
"log_level_console": LoggingLevel.WARNING,
"log_level_file": LoggingLevel.DEBUG,
@@ -308,4 +309,54 @@ class TestUpdateConsoleFormatter:
# Verify message was logged
assert "Test warning message" in caplog.text
def test_log_console_format_option_set_to_none(
self, tmp_log_path: Path
):
"""Test that when log_console_format option is set to None, it uses ConsoleFormatSettings.ALL"""
# Save the original DEFAULT_LOG_SETTINGS to restore it after test
original_default = Log.DEFAULT_LOG_SETTINGS.copy()
try:
# Reset DEFAULT_LOG_SETTINGS to ensure clean state
Log.DEFAULT_LOG_SETTINGS = {
"log_level_console": Log.DEFAULT_LOG_LEVEL_CONSOLE,
"log_level_file": Log.DEFAULT_LOG_LEVEL_FILE,
"per_run_log": False,
"console_enabled": True,
"console_color_output_enabled": True,
"console_format_type": ConsoleFormatSettings.ALL,
"add_start_info": True,
"add_end_info": False,
"log_queue": None,
}
# Create a fresh settings dict with console_format_type explicitly set to None
settings: LogSettings = {
"log_level_console": LoggingLevel.WARNING,
"log_level_file": LoggingLevel.DEBUG,
"per_run_log": False,
"console_enabled": True,
"console_color_output_enabled": False,
"console_format_type": None, # type: ignore
"add_start_info": False,
"add_end_info": False,
"log_queue": None,
}
# Verify that None is explicitly set in the input
assert settings['console_format_type'] is None
log = Log(
log_path=tmp_log_path,
log_name="test_log",
log_settings=settings
)
# Verify that None was replaced with ConsoleFormatSettings.ALL
# The Log class should replace None with the default value (ALL)
assert log.log_settings['console_format_type'] == ConsoleFormatSettings.ALL
finally:
# Restore original DEFAULT_LOG_SETTINGS
Log.DEFAULT_LOG_SETTINGS = original_default
# __END__

View File

@@ -2,11 +2,10 @@
PyTest: requests_handling/caller
"""
from typing import Any
from unittest.mock import Mock, patch
import pytest
import requests
from corelibs.requests_handling.caller import Caller
from corelibs.requests_handling.caller import Caller, ErrorResponse, ProxyConfig
class TestCallerInit:
@@ -21,13 +20,17 @@ class TestCallerInit:
assert caller.timeout == 20
assert caller.verify is True
assert caller.proxy is None
assert caller.cafile is None
assert caller.ca_file is None
def test_init_with_all_params(self):
"""Test Caller initialization with all parameters"""
header = {"Authorization": "Bearer token", "Content-Type": "application/json"}
proxy = {"http": "http://proxy.example.com:8080", "https": "https://proxy.example.com:8080"}
caller = Caller(header=header, verify=False, timeout=30, proxy=proxy)
proxy: ProxyConfig = {
"type": "socks5",
"host": "proxy.example.com:8080",
"port": "8080"
}
caller = Caller(header=header, timeout=30, proxy=proxy, verify=False)
assert caller.headers == header
assert caller.timeout == 30
@@ -58,7 +61,7 @@ class TestCallerInit:
ca_file_path = "/path/to/ca/cert.pem"
caller = Caller(header={}, ca_file=ca_file_path)
assert caller.cafile == ca_file_path
assert caller.ca_file == ca_file_path
class TestCallerGet:
@@ -81,7 +84,8 @@ class TestCallerGet:
headers={"Authorization": "Bearer token"},
timeout=20,
verify=True,
proxies=None
proxies=None,
cert=None
)
@patch('corelibs.requests_handling.caller.requests.get')
@@ -101,7 +105,8 @@ class TestCallerGet:
headers={},
timeout=20,
verify=True,
proxies=None
proxies=None,
cert=None
)
@patch('corelibs.requests_handling.caller.requests.get')
@@ -134,7 +139,11 @@ class TestCallerGet:
mock_response = Mock(spec=requests.Response)
mock_get.return_value = mock_response
proxy = {"http": "http://proxy.example.com:8080"}
proxy: ProxyConfig = {
"type": "socks5",
"host": "proxy.example.com:8080",
"port": "8080"
}
caller = Caller(header={}, proxy=proxy)
caller.get("https://api.example.com/data")
@@ -142,40 +151,46 @@ class TestCallerGet:
assert mock_get.call_args[1]["proxies"] == proxy
@patch('corelibs.requests_handling.caller.requests.get')
def test_get_invalid_schema_returns_none(self, mock_get: Mock, capsys: Any):
"""Test GET request with invalid URL schema returns None"""
def test_get_invalid_schema_returns_none(self, mock_get: Mock):
"""Test GET request with invalid URL schema returns ErrorResponse"""
mock_get.side_effect = requests.exceptions.InvalidSchema("Invalid URL")
caller = Caller(header={})
response = caller.get("invalid://example.com")
assert response is None
captured = capsys.readouterr()
assert "Invalid URL during 'get'" in captured.out
assert isinstance(response, ErrorResponse)
assert response.code == 200
assert "Invalid URL during 'get'" in response.message
assert response.action == "get"
assert response.url == "invalid://example.com"
@patch('corelibs.requests_handling.caller.requests.get')
def test_get_timeout_returns_none(self, mock_get: Mock, capsys: Any):
"""Test GET request timeout returns None"""
def test_get_timeout_returns_none(self, mock_get: Mock):
"""Test GET request timeout returns ErrorResponse"""
mock_get.side_effect = requests.exceptions.ReadTimeout("Timeout")
caller = Caller(header={})
response = caller.get("https://api.example.com/data")
assert response is None
captured = capsys.readouterr()
assert "Timeout (20s) during 'get'" in captured.out
assert isinstance(response, ErrorResponse)
assert response.code == 300
assert "Timeout (20s) during 'get'" in response.message
assert response.action == "get"
assert response.url == "https://api.example.com/data"
@patch('corelibs.requests_handling.caller.requests.get')
def test_get_connection_error_returns_none(self, mock_get: Mock, capsys: Any):
"""Test GET request connection error returns None"""
def test_get_connection_error_returns_none(self, mock_get: Mock):
"""Test GET request connection error returns ErrorResponse"""
mock_get.side_effect = requests.exceptions.ConnectionError("Connection failed")
caller = Caller(header={})
response = caller.get("https://api.example.com/data")
assert response is None
captured = capsys.readouterr()
assert "Connection error during 'get'" in captured.out
assert isinstance(response, ErrorResponse)
assert response.code == 400
assert "Connection error during 'get'" in response.message
assert response.action == "get"
assert response.url == "https://api.example.com/data"
class TestCallerPost:
@@ -200,7 +215,8 @@ class TestCallerPost:
headers={"Content-Type": "application/json"},
timeout=20,
verify=True,
proxies=None
proxies=None,
cert=None
)
@patch('corelibs.requests_handling.caller.requests.post')
@@ -234,40 +250,46 @@ class TestCallerPost:
assert mock_post.call_args[1]["json"] == data
@patch('corelibs.requests_handling.caller.requests.post')
def test_post_invalid_schema_returns_none(self, mock_post: Mock, capsys: Any):
"""Test POST request with invalid URL schema returns None"""
def test_post_invalid_schema_returns_none(self, mock_post: Mock):
"""Test POST request with invalid URL schema returns ErrorResponse"""
mock_post.side_effect = requests.exceptions.InvalidSchema("Invalid URL")
caller = Caller(header={})
response = caller.post("invalid://example.com", data={"test": "data"})
assert response is None
captured = capsys.readouterr()
assert "Invalid URL during 'post'" in captured.out
assert isinstance(response, ErrorResponse)
assert response.code == 200
assert "Invalid URL during 'post'" in response.message
assert response.action == "post"
assert response.url == "invalid://example.com"
@patch('corelibs.requests_handling.caller.requests.post')
def test_post_timeout_returns_none(self, mock_post: Mock, capsys: Any):
"""Test POST request timeout returns None"""
def test_post_timeout_returns_none(self, mock_post: Mock):
"""Test POST request timeout returns ErrorResponse"""
mock_post.side_effect = requests.exceptions.ReadTimeout("Timeout")
caller = Caller(header={})
response = caller.post("https://api.example.com/data", data={"test": "data"})
assert response is None
captured = capsys.readouterr()
assert "Timeout (20s) during 'post'" in captured.out
assert isinstance(response, ErrorResponse)
assert response.code == 300
assert "Timeout (20s) during 'post'" in response.message
assert response.action == "post"
assert response.url == "https://api.example.com/data"
@patch('corelibs.requests_handling.caller.requests.post')
def test_post_connection_error_returns_none(self, mock_post: Mock, capsys: Any):
"""Test POST request connection error returns None"""
def test_post_connection_error_returns_none(self, mock_post: Mock):
"""Test POST request connection error returns ErrorResponse"""
mock_post.side_effect = requests.exceptions.ConnectionError("Connection failed")
caller = Caller(header={})
response = caller.post("https://api.example.com/data", data={"test": "data"})
assert response is None
captured = capsys.readouterr()
assert "Connection error during 'post'" in captured.out
assert isinstance(response, ErrorResponse)
assert response.code == 400
assert "Connection error during 'post'" in response.message
assert response.action == "post"
assert response.url == "https://api.example.com/data"
class TestCallerPut:
@@ -292,7 +314,8 @@ class TestCallerPut:
headers={"Content-Type": "application/json"},
timeout=20,
verify=True,
proxies=None
proxies=None,
cert=None
)
@patch('corelibs.requests_handling.caller.requests.put')
@@ -311,16 +334,18 @@ class TestCallerPut:
assert mock_put.call_args[1]["params"] == params
@patch('corelibs.requests_handling.caller.requests.put')
def test_put_timeout_returns_none(self, mock_put: Mock, capsys: Any):
"""Test PUT request timeout returns None"""
def test_put_timeout_returns_none(self, mock_put: Mock):
"""Test PUT request timeout returns ErrorResponse"""
mock_put.side_effect = requests.exceptions.ReadTimeout("Timeout")
caller = Caller(header={})
response = caller.put("https://api.example.com/data/1", data={"test": "data"})
assert response is None
captured = capsys.readouterr()
assert "Timeout (20s) during 'put'" in captured.out
assert isinstance(response, ErrorResponse)
assert response.code == 300
assert "Timeout (20s) during 'put'" in response.message
assert response.action == "put"
assert response.url == "https://api.example.com/data/1"
class TestCallerPatch:
@@ -345,7 +370,8 @@ class TestCallerPatch:
headers={"Content-Type": "application/json"},
timeout=20,
verify=True,
proxies=None
proxies=None,
cert=None
)
@patch('corelibs.requests_handling.caller.requests.patch')
@@ -364,16 +390,18 @@ class TestCallerPatch:
assert mock_patch.call_args[1]["params"] == params
@patch('corelibs.requests_handling.caller.requests.patch')
def test_patch_connection_error_returns_none(self, mock_patch: Mock, capsys: Any):
"""Test PATCH request connection error returns None"""
def test_patch_connection_error_returns_none(self, mock_patch: Mock):
"""Test PATCH request connection error returns ErrorResponse"""
mock_patch.side_effect = requests.exceptions.ConnectionError("Connection failed")
caller = Caller(header={})
response = caller.patch("https://api.example.com/data/1", data={"test": "data"})
assert response is None
captured = capsys.readouterr()
assert "Connection error during 'patch'" in captured.out
assert isinstance(response, ErrorResponse)
assert response.code == 400
assert "Connection error during 'patch'" in response.message
assert response.action == "patch"
assert response.url == "https://api.example.com/data/1"
class TestCallerDelete:
@@ -396,7 +424,8 @@ class TestCallerDelete:
headers={"Authorization": "Bearer token"},
timeout=20,
verify=True,
proxies=None
proxies=None,
cert=None
)
@patch('corelibs.requests_handling.caller.requests.delete')
@@ -414,16 +443,18 @@ class TestCallerDelete:
assert mock_delete.call_args[1]["params"] == params
@patch('corelibs.requests_handling.caller.requests.delete')
def test_delete_invalid_schema_returns_none(self, mock_delete: Mock, capsys: Any):
"""Test DELETE request with invalid URL schema returns None"""
def test_delete_invalid_schema_returns_none(self, mock_delete: Mock):
"""Test DELETE request with invalid URL schema returns ErrorResponse"""
mock_delete.side_effect = requests.exceptions.InvalidSchema("Invalid URL")
caller = Caller(header={})
response = caller.delete("invalid://example.com/data/1")
assert response is None
captured = capsys.readouterr()
assert "Invalid URL during 'delete'" in captured.out
assert isinstance(response, ErrorResponse)
assert response.code == 200
assert "Invalid URL during 'delete'" in response.message
assert response.action == "delete"
assert response.url == "invalid://example.com/data/1"
class TestCallerParametrized:
@@ -492,7 +523,7 @@ class TestCallerParametrized:
])
@patch('corelibs.requests_handling.caller.requests.get')
def test_exception_handling(
self, mock_get: Mock, exception_class: type, expected_message: str, capsys: Any
self, mock_get: Mock, exception_class: type, expected_message: str
):
"""Test exception handling for all exception types"""
mock_get.side_effect = exception_class("Test error")
@@ -500,9 +531,8 @@ class TestCallerParametrized:
caller = Caller(header={})
response = caller.get("https://api.example.com/data")
assert response is None
captured = capsys.readouterr()
assert expected_message in captured.out
assert isinstance(response, ErrorResponse)
assert expected_message in response.message
class TestCallerIntegration:
@@ -599,7 +629,8 @@ class TestCallerEdgeCases:
headers={},
timeout=20,
verify=True,
proxies=None
proxies=None,
cert=None
)
@patch('corelibs.requests_handling.caller.requests.post')
@@ -659,7 +690,8 @@ class TestCallerEdgeCases:
headers={},
timeout=20,
verify=True,
proxies=None
proxies=None,
cert=None
)
@patch('corelibs.requests_handling.caller.requests.get')
@@ -679,7 +711,8 @@ class TestCallerEdgeCases:
headers={},
timeout=20,
verify=True,
proxies=None
proxies=None,
cert=None
)
def test_timeout_zero(self):
@@ -730,9 +763,10 @@ class TestCallerProxyHandling:
mock_response = Mock(spec=requests.Response)
mock_get.return_value = mock_response
proxy = {
"http": "http://proxy.example.com:8080",
"https": "https://proxy.example.com:8080"
proxy: ProxyConfig = {
"type": "socks5",
"host": "proxy.example.com:8080",
"port": "8080"
}
caller = Caller(header={}, proxy=proxy)
caller.get("https://api.example.com/data")
@@ -746,9 +780,10 @@ class TestCallerProxyHandling:
mock_response = Mock(spec=requests.Response)
mock_post.return_value = mock_response
proxy = {
"http": "http://user:pass@proxy.example.com:8080",
"https": "https://user:pass@proxy.example.com:8080"
proxy: ProxyConfig = {
"type": "socks5",
"host": "proxy.example.com:8080",
"port": "8080"
}
caller = Caller(header={}, proxy=proxy)
caller.post("https://api.example.com/data", data={"test": "data"})
@@ -789,7 +824,7 @@ class TestCallerResponseHandling:
caller = Caller(header={})
response = caller.get("https://api.example.com/data")
assert response is not None
assert not isinstance(response, ErrorResponse)
assert response.status_code == 200
assert response.text == "Success"
assert response.json() == {"status": "ok"}
@@ -805,7 +840,7 @@ class TestCallerResponseHandling:
caller = Caller(header={})
response = caller.get("https://api.example.com/data")
assert response is not None
assert not isinstance(response, ErrorResponse)
assert response.status_code == status_code