Compare commits
33 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
db6a3b53c5 | ||
|
|
82b089498e | ||
|
|
948b0dd5e7 | ||
|
|
4acc0b51b1 | ||
|
|
a626b738a9 | ||
|
|
7119844313 | ||
|
|
5763f57830 | ||
|
|
70e8ceecce | ||
|
|
acbe1ac692 | ||
|
|
99bca2c467 | ||
|
|
b74ed1f30e | ||
|
|
8082ab78a1 | ||
|
|
c69076f517 | ||
|
|
648ab001b6 | ||
|
|
447034046e | ||
|
|
0770ac0bb4 | ||
|
|
aa2fbd4f70 | ||
|
|
58c8447531 | ||
|
|
bcca43d774 | ||
|
|
e9ccfe7ad2 | ||
|
|
6c2637ad34 | ||
|
|
7183d05dd6 | ||
|
|
b45ca85cd3 | ||
|
|
4ca45ebc73 | ||
|
|
6902768fed | ||
|
|
3f9f2ceaac | ||
|
|
2a248bd249 | ||
|
|
c559a6bafb | ||
|
|
19d7e9b5ed | ||
|
|
3e5a5accf7 | ||
|
|
424c91945a | ||
|
|
c657dc564e | ||
|
|
208f002284 |
@@ -12,11 +12,12 @@ This is a pip package that can be installed into any project and covers the foll
|
||||
|
||||
## Current list
|
||||
|
||||
- config_handling: simple INI config file data loader with check/convert/etc
|
||||
- csv_handling: csv dict writer helper
|
||||
- debug_handling: various debug helpers like data dumper, timer, utilization, etc
|
||||
- file_handling: crc handling for file content and file names, progress bar
|
||||
- json_handling: jmespath support and json date support
|
||||
- list_dict_handling: list and dictionary handling support (search, fingerprinting, etc)
|
||||
- iterator_handling: list and dictionary handling support (search, fingerprinting, etc)
|
||||
- logging_handling: extend log and also error message handling
|
||||
- requests_handling: requests wrapper for better calls with auth headers
|
||||
- script_handling: pid lock file handling, abort timer
|
||||
|
||||
5
ToDo.md
5
ToDo.md
@@ -1,4 +1,5 @@
|
||||
# ToDo list
|
||||
|
||||
- stub files .pyi
|
||||
- fix all remaning check errors
|
||||
- [ ] stub files .pyi
|
||||
- [ ] Add tests for all, we need 100% test coverate
|
||||
- [ ] Log: add custom format for "stack_correct" if set, this will override the normal stack block
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# MARK: Project info
|
||||
[project]
|
||||
name = "corelibs"
|
||||
version = "0.10.0"
|
||||
version = "0.13.1"
|
||||
description = "Collection of utils for Python scripts"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.13"
|
||||
@@ -53,6 +53,10 @@ notes = ["FIXME", "TODO"]
|
||||
notes-rgx = '(FIXME|TODO)(\((TTD-|#)\[0-9]+\))'
|
||||
[tool.flake8]
|
||||
max-line-length = 120
|
||||
ignore = [
|
||||
"E741", # ignore ambigious variable name
|
||||
"W504" # Line break occurred after a binary operator [wrong triggered by "or" in if]
|
||||
]
|
||||
[tool.pylint.MASTER]
|
||||
# this is for the tests/etc folders
|
||||
init-hook='import sys; sys.path.append("src/")'
|
||||
|
||||
14
src/corelibs/check_handling/regex_constants.py
Normal file
14
src/corelibs/check_handling/regex_constants.py
Normal file
@@ -0,0 +1,14 @@
|
||||
"""
|
||||
List of regex compiled strings that can be used
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
|
||||
EMAIL_REGEX_BASIC = r"""
|
||||
^[A-Za-z0-9!#$%&'*+\-\/=?^_`{|}~][A-Za-z0-9!#$%:\(\)&'*+\-\/=?^_`{|}~\.]{0,63}
|
||||
@(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[a-zA-Z]{2,6}$
|
||||
"""
|
||||
EMAIL_REGEX_BASIC_COMPILED = re.compile(EMAIL_REGEX_BASIC)
|
||||
|
||||
# __END__
|
||||
563
src/corelibs/config_handling/settings_loader.py
Normal file
563
src/corelibs/config_handling/settings_loader.py
Normal file
@@ -0,0 +1,563 @@
|
||||
"""
|
||||
Load settings file for a certain group
|
||||
Check data for existing and valid
|
||||
Additional check for override settings as arguments
|
||||
"""
|
||||
|
||||
import re
|
||||
import configparser
|
||||
from typing import Any, Tuple, Sequence, cast
|
||||
from pathlib import Path
|
||||
from corelibs.logging_handling.log import Log
|
||||
from corelibs.iterator_handling.list_helpers import convert_to_list, is_list_in_list
|
||||
from corelibs.var_handling.var_helpers import is_int, is_float, str_to_bool
|
||||
from corelibs.config_handling.settings_loader_handling.settings_loader_check import SettingsLoaderCheck
|
||||
|
||||
|
||||
class SettingsLoader:
|
||||
"""
|
||||
Settings Loader with Argument parser
|
||||
"""
|
||||
|
||||
# split char
|
||||
DEFAULT_ELEMENT_SPLIT_CHAR: str = ','
|
||||
|
||||
CONVERT_TO_LIST: list[str] = ['str', 'int', 'float', 'bool', 'auto']
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
args: dict[str, Any],
|
||||
config_file: Path,
|
||||
log: 'Log | None' = None,
|
||||
always_print: bool = False
|
||||
) -> None:
|
||||
"""
|
||||
init the Settings loader
|
||||
|
||||
Args:
|
||||
args (dict): Script Arguments
|
||||
config_file (Path): config file including path
|
||||
log (Log | None): Lop class, if set errors are written to this
|
||||
always_print (bool): Set to true to always print errors, even if Log is available
|
||||
element_split_char (str): Split character, default is ','
|
||||
|
||||
Raises:
|
||||
ValueError: _description_
|
||||
"""
|
||||
self.args = args
|
||||
self.config_file = config_file
|
||||
self.log = log
|
||||
self.always_print = always_print
|
||||
# entries that have to be split
|
||||
self.entry_split_char: dict[str, str] = {}
|
||||
# entries that should be converted
|
||||
self.entry_convert: dict[str, str] = {}
|
||||
# default set entries
|
||||
self.entry_set_empty: dict[str, str | None] = {}
|
||||
# config parser, load config file first
|
||||
self.config_parser: configparser.ConfigParser | None = self.__load_config_file()
|
||||
# all settings
|
||||
self.settings: dict[str, dict[str, None | str | int | float | bool]] | None = None
|
||||
# remove file name and get base path and check
|
||||
if not self.config_file.parent.is_dir():
|
||||
raise ValueError(f"Cannot find the config folder: {self.config_file.parent}")
|
||||
# for check settings, abort flag
|
||||
self._check_settings_abort: bool = False
|
||||
|
||||
# MARK: load settings
|
||||
def load_settings(
|
||||
self,
|
||||
config_id: str,
|
||||
config_validate: dict[str, list[str]],
|
||||
allow_not_exist: bool = False
|
||||
) -> dict[str, str]:
|
||||
"""
|
||||
neutral settings loader
|
||||
|
||||
The settings values on the right side are seen as a list if they have "," inside (see ELEMENT SPLIT CHAR)
|
||||
but only if the "check:list." is set
|
||||
|
||||
for the allowe entries set, each set is "key => checks", check set is "check type:settings"
|
||||
key: the key name in the settings file
|
||||
check: check set with the following allowed entries on the left side for type
|
||||
- mandatory: must be set as "mandatory:yes", if the key entry is missing or empty throws error
|
||||
- check: see __check_settings for the settings currently available
|
||||
- matching: a | list of entries where the value has to match too
|
||||
- in: the right side is another KEY value from the settings where this value must be inside
|
||||
- split: character to split entries, if set check:list+ must be set if checks are needed
|
||||
- convert: convert to int, float -> if element is number convert, else leave as is
|
||||
- empty: convert empty to, if nothing set on the right side then convert to None type
|
||||
|
||||
TODO: there should be a config/options argument for general settings
|
||||
|
||||
Args:
|
||||
config_id (str): what block to load
|
||||
config_validate (dict[str, list[str]]): list of allowed entries sets
|
||||
allow_not_exist (bool): If set to True, does not throw an error, but returns empty set
|
||||
|
||||
Returns:
|
||||
dict[str, str]: key = value list
|
||||
"""
|
||||
settings: dict[str, dict[str, Any]] = {
|
||||
config_id: {},
|
||||
}
|
||||
if self.config_parser is not None:
|
||||
try:
|
||||
# load all data as is, validation is done afterwards
|
||||
settings[config_id] = dict(self.config_parser[config_id])
|
||||
except KeyError as e:
|
||||
if allow_not_exist is True:
|
||||
return {}
|
||||
raise ValueError(self.__print(
|
||||
f"[!] Cannot read [{config_id}] block in the {self.config_file}: {e}",
|
||||
'CRITICAL'
|
||||
)) from e
|
||||
try:
|
||||
for key, checks in config_validate.items():
|
||||
skip = True
|
||||
split_char = self.DEFAULT_ELEMENT_SPLIT_CHAR
|
||||
# if one is set as list in check -> do not skip, but add to list
|
||||
for check in checks:
|
||||
if check.startswith("convert:"):
|
||||
try:
|
||||
[_, convert_to] = check.split(":")
|
||||
if convert_to not in self.CONVERT_TO_LIST:
|
||||
raise ValueError(self.__print(
|
||||
f"[!] In [{config_id}] the convert type is invalid {check}: {convert_to}",
|
||||
'CRITICAL'
|
||||
))
|
||||
self.entry_convert[key] = convert_to
|
||||
except ValueError as e:
|
||||
raise ValueError(self.__print(
|
||||
f"[!] In [{config_id}] the convert type setup for entry failed: {check}: {e}",
|
||||
'CRITICAL'
|
||||
)) from e
|
||||
if check.startswith('empty:'):
|
||||
try:
|
||||
[_, empty_set] = check.split(":")
|
||||
if not empty_set:
|
||||
empty_set = None
|
||||
self.entry_set_empty[key] = empty_set
|
||||
except ValueError as e:
|
||||
print(f"VALUE ERROR: {key}")
|
||||
raise ValueError(self.__print(
|
||||
f"[!] In [{config_id}] the empty set type for entry failed: {check}: {e}",
|
||||
'CRITICAL'
|
||||
)) from e
|
||||
# split char, also check to not set it twice, first one only
|
||||
if check.startswith("split:") and not self.entry_split_char.get(key):
|
||||
try:
|
||||
[_, split_char] = check.split(":")
|
||||
if len(split_char) == 0:
|
||||
self.__print(
|
||||
(
|
||||
f"[*] In [{config_id}] the [{key}] split char character is empty, "
|
||||
f"fallback to: {self.DEFAULT_ELEMENT_SPLIT_CHAR}"
|
||||
),
|
||||
"WARNING"
|
||||
)
|
||||
split_char = self.DEFAULT_ELEMENT_SPLIT_CHAR
|
||||
self.entry_split_char[key] = split_char
|
||||
skip = False
|
||||
except ValueError as e:
|
||||
raise ValueError(self.__print(
|
||||
f"[!] In [{config_id}] the split character setup for entry failed: {check}: {e}",
|
||||
'CRITICAL'
|
||||
)) from e
|
||||
if skip:
|
||||
continue
|
||||
settings[config_id][key] = [
|
||||
__value.replace(" ", "")
|
||||
for __value in settings[config_id][key].split(split_char)
|
||||
]
|
||||
except KeyError as e:
|
||||
raise ValueError(self.__print(
|
||||
f"[!] Cannot read [{config_id}] block because the entry [{e}] could not be found",
|
||||
'CRITICAL'
|
||||
)) from e
|
||||
else:
|
||||
# ignore error if arguments are set
|
||||
if not self.__check_arguments(config_validate, True):
|
||||
raise ValueError(self.__print(f"[!] Cannot find file: {self.config_file}", 'CRITICAL'))
|
||||
else:
|
||||
# base set
|
||||
settings[config_id] = {}
|
||||
# make sure all are set
|
||||
# if we have arguments set, this override config settings
|
||||
error: bool = False
|
||||
for entry, validate in config_validate.items():
|
||||
# if we have command line option set, this one overrides config
|
||||
if self.__get_arg(entry):
|
||||
self.__print(f"[*] Command line option override for: {entry}", 'WARNING')
|
||||
settings[config_id][entry] = self.args.get(entry)
|
||||
# validate checks
|
||||
for check in validate:
|
||||
# CHECKS
|
||||
# - mandatory
|
||||
# - check: regex check (see SettingsLoaderCheck class for entries)
|
||||
# - matching: entry in given list
|
||||
# - in: entry in other setting entry list
|
||||
# - length: for string length
|
||||
# - range: for int/float range check
|
||||
# mandatory check
|
||||
if check == "mandatory:yes" and (
|
||||
not settings[config_id].get(entry) or settings[config_id].get(entry) == ['']
|
||||
):
|
||||
error = True
|
||||
self.__print(f"[!] Missing content entry for: {entry}", 'ERROR')
|
||||
# skip if empty none
|
||||
if settings[config_id].get(entry) is None:
|
||||
continue
|
||||
if check.startswith("check:"):
|
||||
# replace the check and run normal checks
|
||||
settings[config_id][entry] = self.__check_settings(
|
||||
check, entry, settings[config_id][entry]
|
||||
)
|
||||
if self._check_settings_abort is True:
|
||||
error = True
|
||||
elif check.startswith("matching:"):
|
||||
checks = check.replace("matching:", "").split("|")
|
||||
if __result := is_list_in_list(convert_to_list(settings[config_id][entry]), list(checks)):
|
||||
error = True
|
||||
self.__print(f"[!] [{entry}] '{__result}' not matching {checks}", 'ERROR')
|
||||
elif check.startswith("in:"):
|
||||
check = check.replace("in:", "")
|
||||
# skip if check does not exist, and set error
|
||||
if settings[config_id].get(check) is None:
|
||||
error = True
|
||||
self.__print(f"[!] [{entry}] '{check}' target does not exist", 'ERROR')
|
||||
continue
|
||||
# entry must be in check entry
|
||||
# in for list, else equal with convert to string
|
||||
if (
|
||||
__result := is_list_in_list(
|
||||
convert_to_list(settings[config_id][entry]),
|
||||
__checks := convert_to_list(settings[config_id][check])
|
||||
)
|
||||
):
|
||||
self.__print(f"[!] [{entry}] '{__result}' must be in the '{__checks}' values list", 'ERROR')
|
||||
error = True
|
||||
elif check.startswith('length:'):
|
||||
check = check.replace("length:", "")
|
||||
# length can be: n, n-, n-m, -m
|
||||
# as: equal, >= >=< =<
|
||||
self.__build_from_to_equal(entry, check)
|
||||
if not self.__length_range_validate(
|
||||
entry,
|
||||
'length',
|
||||
cast(list[str], convert_to_list(settings[config_id][entry])),
|
||||
self.__build_from_to_equal(entry, check, convert_to_int=True)
|
||||
):
|
||||
error = True
|
||||
elif check.startswith('range:'):
|
||||
check = check.replace("range:", "")
|
||||
if not self.__length_range_validate(
|
||||
entry,
|
||||
'range',
|
||||
cast(list[str], convert_to_list(settings[config_id][entry])),
|
||||
self.__build_from_to_equal(entry, check)
|
||||
):
|
||||
error = True
|
||||
# after post clean up if we have empty entries and we are mandatory
|
||||
if check == "mandatory:yes" and (
|
||||
not settings[config_id].get(entry) or settings[config_id].get(entry) == ['']
|
||||
):
|
||||
error = True
|
||||
self.__print(f"[!] Missing content entry for: {entry}", 'ERROR')
|
||||
if error is True:
|
||||
raise ValueError(self.__print("[!] Missing or incorrect settings data. Cannot proceed", 'CRITICAL'))
|
||||
# set empty
|
||||
for [entry, empty_set] in self.entry_set_empty.items():
|
||||
# if set, skip, else set to empty value
|
||||
if settings[config_id].get(entry) or isinstance(settings[config_id].get(entry), list):
|
||||
continue
|
||||
settings[config_id][entry] = empty_set
|
||||
# Convert input
|
||||
for [entry, convert_type] in self.entry_convert.items():
|
||||
if convert_type in ["int", "any"] and is_int(settings[config_id][entry]):
|
||||
settings[config_id][entry] = int(settings[config_id][entry])
|
||||
elif convert_type in ["float", "any"] and is_float(settings[config_id][entry]):
|
||||
settings[config_id][entry] = float(settings[config_id][entry])
|
||||
elif convert_type in ["bool", "any"] and (
|
||||
settings[config_id][entry] == "true" or
|
||||
settings[config_id][entry] == "True" or
|
||||
settings[config_id][entry] == "false" or
|
||||
settings[config_id][entry] == "False"
|
||||
):
|
||||
try:
|
||||
settings[config_id][entry] = str_to_bool(settings[config_id][entry])
|
||||
except ValueError:
|
||||
self.__print(
|
||||
f"[!] Could not convert to boolean for '{entry}': {settings[config_id][entry]}",
|
||||
'ERROR'
|
||||
)
|
||||
# string is always string
|
||||
# TODO: empty and int/float/bool: set to none?
|
||||
|
||||
return settings[config_id]
|
||||
|
||||
# MARK: build from/to/requal logic
|
||||
def __build_from_to_equal(
|
||||
self, entry: str, check: str, convert_to_int: bool = False
|
||||
) -> Tuple[float | None, float | None, float | None]:
|
||||
"""
|
||||
split out the "n-m" part to get the to/from/equal
|
||||
|
||||
Arguments:
|
||||
entry {str} -- _description_
|
||||
check {str} -- _description_
|
||||
|
||||
Returns:
|
||||
Tuple[float | None, float | None, float | None] -- _description_
|
||||
|
||||
Throws:
|
||||
ValueError if range/length entries are not float
|
||||
"""
|
||||
__from = None
|
||||
__to = None
|
||||
__equal = None
|
||||
try:
|
||||
[__from, __to] = check.split('-')
|
||||
if (__from and not is_float(__from)) or (__to and not is_float(__to)):
|
||||
raise ValueError(self.__print(
|
||||
f"[{entry}] Check value for length is not in: {check}",
|
||||
'CRITICAL'
|
||||
))
|
||||
if len(__from) == 0:
|
||||
__from = None
|
||||
if len(__to) == 0:
|
||||
__to = None
|
||||
except ValueError as e:
|
||||
if not is_float(__equal := check):
|
||||
raise ValueError(self.__print(
|
||||
f"[{entry}] Check value for length is not a valid integer: {check}",
|
||||
'CRITICAL'
|
||||
)) from e
|
||||
if len(__equal) == 0:
|
||||
__equal = None
|
||||
# makre sure this is all int or None
|
||||
if __from is not None:
|
||||
__from = int(__from) if convert_to_int else float(__from)
|
||||
if __to is not None:
|
||||
__to = int(__to) if convert_to_int else float(__to)
|
||||
if __equal is not None:
|
||||
__equal = int(__equal) if convert_to_int else float(__equal)
|
||||
return (
|
||||
__from,
|
||||
__to,
|
||||
__equal
|
||||
)
|
||||
|
||||
# MARK: length/range validation
|
||||
def __length_range_validate(
|
||||
self,
|
||||
entry: str,
|
||||
check_type: str,
|
||||
values: Sequence[str | int | float],
|
||||
check: Tuple[float | None, float | None, float | None],
|
||||
) -> bool:
|
||||
(__from, __to, __equal) = check
|
||||
valid = True
|
||||
for value_raw in convert_to_list(values):
|
||||
# skip no tset values for range check
|
||||
if not value_raw:
|
||||
continue
|
||||
value = 0
|
||||
error_mark = ''
|
||||
if check_type == 'length':
|
||||
error_mark = 'length'
|
||||
value = len(str(value_raw))
|
||||
elif check_type == 'range':
|
||||
error_mark = 'range'
|
||||
value = float(str(value_raw))
|
||||
if __equal is not None and value != __equal:
|
||||
self.__print(f"[!] [{entry}] '{value_raw}' {error_mark} does not match {__equal}", 'ERROR')
|
||||
valid = False
|
||||
continue
|
||||
if __from is not None and __to is None and value < __from:
|
||||
self.__print(f"[!] [{entry}] '{value_raw}' {error_mark} smaller than minimum {__from}", 'ERROR')
|
||||
valid = False
|
||||
continue
|
||||
if __from is None and __to is not None and value > __to:
|
||||
self.__print(f"[!] [{entry}] '{value_raw}' {error_mark} larger than maximum {__to}", 'ERROR')
|
||||
valid = False
|
||||
continue
|
||||
if __from is not None and __to is not None and (
|
||||
value < __from or value > __to
|
||||
):
|
||||
self.__print(
|
||||
f"[!] [{entry}] '{value_raw}' {error_mark} outside valid range {__from} to {__to}",
|
||||
'ERROR'
|
||||
)
|
||||
valid = False
|
||||
continue
|
||||
return valid
|
||||
|
||||
# MARK: load config file data from file
|
||||
def __load_config_file(self) -> configparser.ConfigParser | None:
|
||||
"""
|
||||
load and parse the config file
|
||||
if not loadable return None
|
||||
"""
|
||||
config = configparser.ConfigParser()
|
||||
if self.config_file.is_file():
|
||||
config.read(self.config_file)
|
||||
return config
|
||||
return None
|
||||
|
||||
# MARK: regex clean up one
|
||||
def __clean_invalid_setting(
|
||||
self,
|
||||
entry: str,
|
||||
validate: str,
|
||||
value: str,
|
||||
regex: str,
|
||||
regex_clean: str | None,
|
||||
replace: str = "",
|
||||
print_error: bool = True,
|
||||
) -> str:
|
||||
"""
|
||||
check is a string is invalid, print optional error message and clean up string
|
||||
|
||||
Args:
|
||||
entry (str): what entry key
|
||||
validate (str): validate type
|
||||
value (str): the value to check against
|
||||
regex (str): regex used for checking as r'...'
|
||||
regex_clean (str): regex used for cleaning as r'...'
|
||||
replace (str): replace with character. Defaults to ''
|
||||
print_error (bool): print the error message. Defaults to True
|
||||
"""
|
||||
check = re.compile(regex, re.VERBOSE)
|
||||
clean: re.Pattern[str] | None = None
|
||||
if regex_clean is not None:
|
||||
clean = re.compile(regex_clean, re.VERBOSE)
|
||||
# value must be set if clean is None, else empty value is allowed and will fail
|
||||
if (clean is None and value or clean) and not check.search(value):
|
||||
self.__print(
|
||||
f"[!] Invalid content for '{entry}' with check '{validate}' and data: {value}",
|
||||
'ERROR', print_error
|
||||
)
|
||||
# clean up if clean up is not none, else return EMPTY string
|
||||
if clean is not None:
|
||||
return clean.sub(replace, value)
|
||||
self._check_settings_abort = True
|
||||
return ''
|
||||
# else return as is
|
||||
return value
|
||||
|
||||
# MARK: check settings, regx
|
||||
def __check_settings(
|
||||
self,
|
||||
check: str, entry: str, setting_value: list[str] | str
|
||||
) -> list[str] | str:
|
||||
"""
|
||||
check each setting valid
|
||||
The settings are defined in the SettingsLoaderCheck class
|
||||
|
||||
Args:
|
||||
check (str): What check to run
|
||||
entry (str): Variable name, just for information message
|
||||
setting_value (list[str | int] | str | int): settings value data
|
||||
entry_split_char (str | None): split char, for list check
|
||||
|
||||
Returns:
|
||||
list[str | int] |111 str | int: cleaned up settings value data
|
||||
"""
|
||||
check = check.replace("check:", "")
|
||||
# get the check settings
|
||||
__check_settings = SettingsLoaderCheck.CHECK_SETTINGS.get(check)
|
||||
if __check_settings is None:
|
||||
raise ValueError(self.__print(
|
||||
f"[{entry}] Cannot get SettingsLoaderCheck.CHECK_SETTINGS for {check}",
|
||||
'CRITICAL'
|
||||
))
|
||||
# either removes or replaces invalid characters in the list
|
||||
if isinstance(setting_value, list):
|
||||
# clean up invalid characters
|
||||
# loop over result and keep only filled (strip empty)
|
||||
setting_value = [e for e in [
|
||||
self.__clean_invalid_setting(
|
||||
entry, check, str(__entry),
|
||||
__check_settings['regex'], __check_settings['regex_clean'], __check_settings['replace']
|
||||
)
|
||||
for __entry in setting_value
|
||||
] if e]
|
||||
else:
|
||||
setting_value = self.__clean_invalid_setting(
|
||||
entry, check, str(setting_value),
|
||||
__check_settings['regex'], __check_settings['regex_clean'], __check_settings['replace']
|
||||
)
|
||||
# else:
|
||||
# self.__print(f"[!] Unkown type to check", 'ERROR)
|
||||
# return data
|
||||
return setting_value
|
||||
|
||||
# MARK: check arguments, for config file load fail
|
||||
def __check_arguments(self, arguments: dict[str, list[str]], all_set: bool = False) -> bool:
|
||||
"""
|
||||
check if ast least one argument is set
|
||||
|
||||
Args:
|
||||
arguments (list[str]): _description_
|
||||
|
||||
Returns:
|
||||
bool: _description_
|
||||
"""
|
||||
count_set = 0
|
||||
count_arguments = 0
|
||||
has_argument = False
|
||||
for argument, validate in arguments.items():
|
||||
# if argument is mandatory add to count, if not mandatory set has "has" to skip error
|
||||
mandatory = any(entry == "mandatory:yes" for entry in validate)
|
||||
if not mandatory:
|
||||
has_argument = True
|
||||
continue
|
||||
count_arguments += 1
|
||||
if self.__get_arg(argument):
|
||||
has_argument = True
|
||||
count_set += 1
|
||||
# for all set, True only if all are set
|
||||
if all_set is True:
|
||||
has_argument = count_set == count_arguments
|
||||
|
||||
return has_argument
|
||||
|
||||
# MARK: get argument from args dict
|
||||
def __get_arg(self, entry: str) -> Any:
|
||||
"""
|
||||
check if an argument entry xists, if None -> returns None else value of argument
|
||||
|
||||
Arguments:
|
||||
entry {str} -- _description_
|
||||
|
||||
Returns:
|
||||
Any -- _description_
|
||||
"""
|
||||
if self.args.get(entry) is None:
|
||||
return None
|
||||
return self.args.get(entry)
|
||||
|
||||
# MARK: error print
|
||||
def __print(self, msg: str, level: str, print_error: bool = True) -> str:
|
||||
"""
|
||||
print out error, if Log class is set then print to log instead
|
||||
|
||||
Arguments:
|
||||
msg {str} -- _description_
|
||||
level {str} -- _description_
|
||||
|
||||
Keyword Arguments:
|
||||
print_error {bool} -- _description_ (default: {True})
|
||||
"""
|
||||
if self.log is not None:
|
||||
if not Log.validate_log_level(level):
|
||||
level = 'ERROR'
|
||||
self.log.logger.log(Log.get_log_level_int(level), msg, stacklevel=2)
|
||||
if self.log is None or self.always_print:
|
||||
if print_error:
|
||||
print(msg)
|
||||
return msg
|
||||
|
||||
|
||||
# __END__
|
||||
@@ -0,0 +1,81 @@
|
||||
"""
|
||||
Class of checks that can be run on value entries
|
||||
"""
|
||||
|
||||
from typing import TypedDict
|
||||
from corelibs.check_handling.regex_constants import EMAIL_REGEX_BASIC
|
||||
|
||||
|
||||
class SettingsLoaderCheckValue(TypedDict):
|
||||
"""Settings check entries"""
|
||||
|
||||
regex: str
|
||||
# if None, then on error we exit, eles we clean up data
|
||||
regex_clean: str | None
|
||||
replace: str
|
||||
|
||||
|
||||
class SettingsLoaderCheck:
|
||||
"""
|
||||
check:<NAME> or check:list+<NAME>
|
||||
"""
|
||||
|
||||
CHECK_SETTINGS: dict[str, SettingsLoaderCheckValue] = {
|
||||
"int": {
|
||||
"regex": r"^[0-9]+$",
|
||||
"regex_clean": r"[^0-9]",
|
||||
"replace": "",
|
||||
},
|
||||
"string.alphanumeric": {
|
||||
"regex": r"^[a-zA-Z0-9]+$",
|
||||
"regex_clean": r"[^a-zA-Z0-9]",
|
||||
"replace": "",
|
||||
},
|
||||
"string.alphanumeric.lower.dash": {
|
||||
"regex": r"^[a-z0-9-]+$",
|
||||
"regex_clean": r"[^a-z0-9-]",
|
||||
"replace": "",
|
||||
},
|
||||
# A-Z a-z 0-9 _ - . ONLY
|
||||
# This one does not remove, but replaces with _
|
||||
"string.alphanumeric.extended.replace": {
|
||||
"regex": r"^[_.a-zA-Z0-9-]+$",
|
||||
"regex_clean": r"[^_.a-zA-Z0-9-]",
|
||||
"replace": "_",
|
||||
},
|
||||
# This does a baisc email check, only alphanumeric with special characters
|
||||
"string.email.basic": {
|
||||
"regex": EMAIL_REGEX_BASIC,
|
||||
"regex_clean": None,
|
||||
"replace": "",
|
||||
},
|
||||
# Domain check, including localhost no port
|
||||
"string.domain.with-localhost": {
|
||||
"regex": r"^(?:localhost|(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[A-Za-z]{2,})$",
|
||||
"regex_clean": None,
|
||||
"replace": "",
|
||||
},
|
||||
# Domain check, with localhost and port
|
||||
"string.domain.with-localhost.port": {
|
||||
"regex": r"""
|
||||
^(?:localhost|(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[A-Za-z]{2,})(?::\d+)?$
|
||||
""",
|
||||
"regex_clean": None,
|
||||
"replace": "",
|
||||
},
|
||||
# Domain check, no pure localhost allowed
|
||||
"string.domain": {
|
||||
"regex": r"^(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[A-Za-z]{2,}$",
|
||||
"regex_clean": None,
|
||||
"replace": "",
|
||||
},
|
||||
# Basic date check, does not validate date itself
|
||||
"string.date": {
|
||||
"regex": r"^\d{4}[/-]\d{1,2}[/-]\d{1,2}$",
|
||||
"regex_clean": None,
|
||||
"replace": "",
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
# __END__
|
||||
33
src/corelibs/debug_handling/debug_helpers.py
Normal file
33
src/corelibs/debug_handling/debug_helpers.py
Normal file
@@ -0,0 +1,33 @@
|
||||
"""
|
||||
Various debug helpers
|
||||
"""
|
||||
|
||||
import traceback
|
||||
import os
|
||||
|
||||
|
||||
def traceback_call_str(start: int = 2, depth: int = 1):
|
||||
"""
|
||||
get the trace for the last entry
|
||||
|
||||
Keyword Arguments:
|
||||
start {int} -- _description_ (default: {2})
|
||||
depth {int} -- _description_ (default: {1})
|
||||
|
||||
Returns:
|
||||
_type_ -- _description_
|
||||
"""
|
||||
# can't have more than in the stack for depth
|
||||
depth = min(depth, start)
|
||||
depth = start - depth
|
||||
# 0 is full stack length from start
|
||||
if depth == 0:
|
||||
stack = traceback.extract_stack()[-start:]
|
||||
else:
|
||||
stack = traceback.extract_stack()[-start:-depth]
|
||||
return ' -> '.join(
|
||||
f"{os.path.basename(f.filename)}:{f.name}:{f.lineno}"
|
||||
for f in stack
|
||||
)
|
||||
|
||||
# __END__
|
||||
0
src/corelibs/iterator_handling/__init__.py
Normal file
0
src/corelibs/iterator_handling/__init__.py
Normal file
@@ -3,6 +3,9 @@ Dict helpers
|
||||
"""
|
||||
|
||||
|
||||
from typing import Any
|
||||
|
||||
|
||||
def mask(
|
||||
data_set: dict[str, str],
|
||||
mask_keys: list[str] | None = None,
|
||||
@@ -34,4 +37,22 @@ def mask(
|
||||
for key, value in data_set.items()
|
||||
}
|
||||
|
||||
|
||||
def set_entry(dict_set: dict[str, Any], key: str, value_set: Any) -> dict[str, Any]:
|
||||
"""
|
||||
set a new entry in the dict set
|
||||
|
||||
Arguments:
|
||||
key {str} -- _description_
|
||||
dict_set {dict[str, Any]} -- _description_
|
||||
value_set {Any} -- _description_
|
||||
|
||||
Returns:
|
||||
dict[str, Any] -- _description_
|
||||
"""
|
||||
if not dict_set.get(key):
|
||||
dict_set[key] = {}
|
||||
dict_set[key] = value_set
|
||||
return dict_set
|
||||
|
||||
# __END__
|
||||
47
src/corelibs/iterator_handling/list_helpers.py
Normal file
47
src/corelibs/iterator_handling/list_helpers.py
Normal file
@@ -0,0 +1,47 @@
|
||||
"""
|
||||
List type helpers
|
||||
"""
|
||||
|
||||
from typing import Any, Sequence
|
||||
|
||||
|
||||
def convert_to_list(
|
||||
entry: str | int | float | bool | Sequence[str | int | float | bool | Sequence[Any]]
|
||||
) -> Sequence[str | int | float | bool | Sequence[Any]]:
|
||||
"""
|
||||
Convert any of the non list values (except dictionary) to a list
|
||||
|
||||
Arguments:
|
||||
entry {str | int | float | bool | list[str | int | float | bool]} -- _description_
|
||||
|
||||
Returns:
|
||||
list[str | int | float | bool] -- _description_
|
||||
"""
|
||||
if isinstance(entry, list):
|
||||
return entry
|
||||
return [entry]
|
||||
|
||||
|
||||
def is_list_in_list(
|
||||
list_a: Sequence[str | int | float | bool | Sequence[Any]],
|
||||
list_b: Sequence[str | int | float | bool | Sequence[Any]]
|
||||
) -> Sequence[str | int | float | bool | Sequence[Any]]:
|
||||
"""
|
||||
Return entries from list_a that are not in list_b
|
||||
Type safe compare
|
||||
|
||||
Arguments:
|
||||
list_a {list[Any]} -- _description_
|
||||
list_b {list[Any]} -- _description_
|
||||
|
||||
Returns:
|
||||
list[Any] -- _description_
|
||||
"""
|
||||
# Create sets of (value, type) tuples
|
||||
set_a = set((item, type(item)) for item in list_a)
|
||||
set_b = set((item, type(item)) for item in list_b)
|
||||
|
||||
# Get the difference and extract just the values
|
||||
return [item for item, _ in set_a - set_b]
|
||||
|
||||
# __END__
|
||||
@@ -7,14 +7,18 @@ attach "init_worker_logging" with the set log_queue
|
||||
import re
|
||||
import logging.handlers
|
||||
import logging
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Mapping, TextIO, TypedDict, Any, TYPE_CHECKING, cast
|
||||
from typing import MutableMapping, TextIO, TypedDict, Any, TYPE_CHECKING, cast
|
||||
from corelibs.logging_handling.logging_level_handling.logging_level import LoggingLevel
|
||||
from corelibs.string_handling.text_colors import Colors
|
||||
from corelibs.debug_handling.debug_helpers import traceback_call_str
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from multiprocessing import Queue
|
||||
|
||||
|
||||
# MARK: Log settings TypedDict
|
||||
class LogSettings(TypedDict):
|
||||
"""
|
||||
log settings
|
||||
@@ -22,8 +26,8 @@ class LogSettings(TypedDict):
|
||||
Arguments:
|
||||
TypedDict {_type_} -- _description_
|
||||
"""
|
||||
log_level_console: str
|
||||
log_level_file: str
|
||||
log_level_console: LoggingLevel
|
||||
log_level_file: LoggingLevel
|
||||
console_enabled: bool
|
||||
console_color_output_enabled: bool
|
||||
add_start_info: bool
|
||||
@@ -31,18 +35,21 @@ class LogSettings(TypedDict):
|
||||
log_queue: 'Queue[str] | None'
|
||||
|
||||
|
||||
# MARK: Custom color filter
|
||||
class CustomConsoleFormatter(logging.Formatter):
|
||||
"""
|
||||
Custom formatter with colors for console output
|
||||
"""
|
||||
|
||||
COLORS = {
|
||||
"DEBUG": Colors.cyan,
|
||||
"INFO": Colors.green,
|
||||
"WARNING": Colors.yellow,
|
||||
"ERROR": Colors.red,
|
||||
"CRITICAL": Colors.magenta,
|
||||
"EXCEPTION": Colors.magenta_bold,
|
||||
LoggingLevel.DEBUG.name: Colors.cyan,
|
||||
LoggingLevel.INFO.name: Colors.green,
|
||||
LoggingLevel.WARNING.name: Colors.yellow,
|
||||
LoggingLevel.ERROR.name: Colors.red,
|
||||
LoggingLevel.CRITICAL.name: Colors.red_bold,
|
||||
LoggingLevel.ALERT.name: Colors.yellow_bold,
|
||||
LoggingLevel.EMERGENCY.name: Colors.magenta_bold,
|
||||
LoggingLevel.EXCEPTION.name: Colors.magenta_bright, # will never be written to console
|
||||
}
|
||||
|
||||
def format(self, record: logging.LogRecord) -> str:
|
||||
@@ -59,7 +66,7 @@ class CustomConsoleFormatter(logging.Formatter):
|
||||
reset = Colors.reset
|
||||
color = self.COLORS.get(record.levelname, reset)
|
||||
# only highlight level for basic
|
||||
if record.levelname in ['DEBUG', 'INFO']:
|
||||
if record.levelname in [LoggingLevel.DEBUG.name, LoggingLevel.INFO.name]:
|
||||
record.levelname = f"{color}{record.levelname}{reset}"
|
||||
return super().format(record)
|
||||
# highlight whole line
|
||||
@@ -67,37 +74,46 @@ class CustomConsoleFormatter(logging.Formatter):
|
||||
return f"{color}{message}{reset}"
|
||||
|
||||
|
||||
# TODO: add custom handlers for stack_trace, if not set fill with %(filename)s:%(funcName)s:%(lineno)d
|
||||
# hasattr(record, 'stack_trace')
|
||||
|
||||
|
||||
# MARK: Log class
|
||||
class Log:
|
||||
"""
|
||||
logger setup
|
||||
"""
|
||||
|
||||
# exception level
|
||||
EXCEPTION: int = 60
|
||||
# spacer lenght characters and the character
|
||||
SPACER_CHAR: str = '='
|
||||
SPACER_LENGTH: int = 32
|
||||
# default logging level
|
||||
DEFAULT_LOG_LEVEL: str = 'WARNING'
|
||||
DEFAULT_LOG_LEVEL: LoggingLevel = LoggingLevel.WARNING
|
||||
DEFAULT_LOG_LEVEL_FILE: LoggingLevel = LoggingLevel.DEBUG
|
||||
DEFAULT_LOG_LEVEL_CONSOLE: LoggingLevel = LoggingLevel.WARNING
|
||||
# default settings
|
||||
DEFAULT_LOG_SETTINGS: LogSettings = {
|
||||
"log_level_console": "WARNING",
|
||||
"log_level_file": "DEBUG",
|
||||
"console_enabled": True,
|
||||
"console_color_output_enabled": True,
|
||||
"add_start_info": True,
|
||||
"add_end_info": False,
|
||||
"log_queue": None,
|
||||
}
|
||||
"log_level_console": LoggingLevel.WARNING,
|
||||
"log_level_file": LoggingLevel.DEBUG,
|
||||
"console_enabled": True,
|
||||
"console_color_output_enabled": True,
|
||||
"add_start_info": True,
|
||||
"add_end_info": False,
|
||||
"log_queue": None,
|
||||
}
|
||||
|
||||
# MARK: constructor
|
||||
def __init__(
|
||||
self,
|
||||
log_path: Path,
|
||||
log_name: str,
|
||||
log_settings: dict[str, 'str | bool | None | Queue[str]'] | LogSettings | None = None,
|
||||
log_settings: dict[str, 'LoggingLevel | str | bool | None | Queue[str]'] | LogSettings | None = None,
|
||||
other_handlers: dict[str, Any] | None = None
|
||||
):
|
||||
# add new level for EXCEPTION
|
||||
logging.addLevelName(Log.EXCEPTION, 'EXCEPTION')
|
||||
# add new level for alert, emergecny and exception
|
||||
logging.addLevelName(LoggingLevel.ALERT.value, LoggingLevel.ALERT.name)
|
||||
logging.addLevelName(LoggingLevel.EMERGENCY.value, LoggingLevel.EMERGENCY.name)
|
||||
logging.addLevelName(LoggingLevel.EXCEPTION.value, LoggingLevel.EXCEPTION.name)
|
||||
# parse the logging settings
|
||||
self.log_settings = self.__parse_log_settings(log_settings)
|
||||
# if path, set log name with .log
|
||||
@@ -119,17 +135,25 @@ class Log:
|
||||
|
||||
self.log_queue: 'Queue[str] | None' = None
|
||||
self.listener: logging.handlers.QueueListener | None = None
|
||||
self.logger: logging.Logger
|
||||
|
||||
# setup handlers
|
||||
# NOTE if console with color is set first, some of the color formatting is set
|
||||
# in the file writer too, for the ones where color is set BEFORE the format
|
||||
self.handlers: list[logging.StreamHandler[TextIO] | logging.handlers.TimedRotatingFileHandler] = [
|
||||
# file handler, always
|
||||
self.__file_handler(self.log_settings['log_level_file'], log_path)
|
||||
]
|
||||
# Any is logging.StreamHandler, logging.FileHandler and all logging.handlers.*
|
||||
self.handlers: dict[str, Any] = {}
|
||||
self.add_handler('file_handler', self.__create_time_rotating_file_handler(
|
||||
self.log_settings['log_level_file'], log_path)
|
||||
)
|
||||
if self.log_settings['console_enabled']:
|
||||
# console
|
||||
self.handlers.append(self.__console_handler(self.log_settings['log_level_console']))
|
||||
self.add_handler('stream_handler', self.__create_console_handler(
|
||||
self.log_settings['log_level_console'])
|
||||
)
|
||||
# add other handlers,
|
||||
if other_handlers is not None:
|
||||
for handler_key, handler in other_handlers.items():
|
||||
self.add_handler(handler_key, handler)
|
||||
# init listener if we have a log_queue set
|
||||
self.__init_listener(self.log_settings['log_queue'])
|
||||
|
||||
@@ -139,6 +163,7 @@ class Log:
|
||||
if self.log_settings['add_start_info'] is True:
|
||||
self.break_line('START')
|
||||
|
||||
# MARK: deconstructor
|
||||
def __del__(self):
|
||||
"""
|
||||
Call when class is destroyed, make sure the listender is closed or else we throw a thread error
|
||||
@@ -147,9 +172,10 @@ class Log:
|
||||
self.break_line('END')
|
||||
self.stop_listener()
|
||||
|
||||
# MARK: parse log settings
|
||||
def __parse_log_settings(
|
||||
self,
|
||||
log_settings: dict[str, 'str | bool | None | Queue[str]'] | LogSettings | None
|
||||
log_settings: dict[str, 'LoggingLevel | str | bool | None | Queue[str]'] | LogSettings | None
|
||||
) -> LogSettings:
|
||||
# skip with defaul it not set
|
||||
if log_settings is None:
|
||||
@@ -161,11 +187,11 @@ class Log:
|
||||
if log_settings.get(__log_entry) is None:
|
||||
continue
|
||||
# if not valid reset to default, if not in default set to WARNING
|
||||
if not self.validate_log_level(_log_level := log_settings.get(__log_entry, '')):
|
||||
_log_level = self.DEFAULT_LOG_SETTINGS.get(
|
||||
if not self.validate_log_level(__log_level := log_settings.get(__log_entry, '')):
|
||||
__log_level = self.DEFAULT_LOG_SETTINGS.get(
|
||||
__log_entry, self.DEFAULT_LOG_LEVEL
|
||||
)
|
||||
default_log_settings[__log_entry] = str(_log_level)
|
||||
default_log_settings[__log_entry] = LoggingLevel.from_any(__log_level)
|
||||
# check bool
|
||||
for __log_entry in [
|
||||
"console_enabled",
|
||||
@@ -188,10 +214,36 @@ class Log:
|
||||
def __filter_exceptions(self, record: logging.LogRecord) -> bool:
|
||||
return record.levelname != "EXCEPTION"
|
||||
|
||||
def __console_handler(self, log_level_console: str = 'WARNING') -> logging.StreamHandler[TextIO]:
|
||||
# MARK: add a handler
|
||||
def add_handler(
|
||||
self,
|
||||
handler_name: str,
|
||||
handler: Any
|
||||
) -> bool:
|
||||
"""
|
||||
Add a log handler to the handlers dict
|
||||
|
||||
Arguments:
|
||||
handler_name {str} -- _description_
|
||||
handler {Any} -- _description_
|
||||
"""
|
||||
if self.handlers.get(handler_name):
|
||||
return False
|
||||
if self.listener is not None or hasattr(self, 'logger'):
|
||||
raise ValueError(
|
||||
f"Cannot add handler {handler_name}: {handler.get_name()} because logger is already running"
|
||||
)
|
||||
# TODO: handler must be some handler type, how to check?
|
||||
self.handlers[handler_name] = handler
|
||||
return True
|
||||
|
||||
# MARK: console handler
|
||||
def __create_console_handler(
|
||||
self, log_level_console: LoggingLevel = LoggingLevel.WARNING, filter_exceptions: bool = True
|
||||
) -> logging.StreamHandler[TextIO]:
|
||||
# console logger
|
||||
if not isinstance(getattr(logging, log_level_console.upper(), None), int):
|
||||
log_level_console = 'WARNING'
|
||||
if not self.validate_log_level(log_level_console):
|
||||
log_level_console = self.DEFAULT_LOG_LEVEL_CONSOLE
|
||||
console_handler = logging.StreamHandler()
|
||||
# format layouts
|
||||
format_string = (
|
||||
@@ -207,21 +259,31 @@ class Log:
|
||||
formatter_console = CustomConsoleFormatter(format_string, datefmt=format_date)
|
||||
else:
|
||||
formatter_console = logging.Formatter(format_string, datefmt=format_date)
|
||||
console_handler.setLevel(log_level_console)
|
||||
console_handler.setLevel(log_level_console.name)
|
||||
console_handler.set_name('console')
|
||||
# do not show exceptions logs on console
|
||||
console_handler.addFilter(self.__filter_exceptions)
|
||||
if filter_exceptions:
|
||||
console_handler.addFilter(self.__filter_exceptions)
|
||||
console_handler.setFormatter(formatter_console)
|
||||
return console_handler
|
||||
|
||||
def __file_handler(self, log_level_file: str, log_path: Path) -> logging.handlers.TimedRotatingFileHandler:
|
||||
# MARK: file handler
|
||||
def __create_time_rotating_file_handler(
|
||||
self, log_level_file: LoggingLevel, log_path: Path,
|
||||
when: str = "D", interval: int = 1, backup_count: int = 0
|
||||
) -> logging.handlers.TimedRotatingFileHandler:
|
||||
# file logger
|
||||
if not isinstance(getattr(logging, log_level_file.upper(), None), int):
|
||||
log_level_file = 'DEBUG'
|
||||
# when: S/M/H/D/W0-W6/midnight
|
||||
# interval: how many, 1D = every day
|
||||
# backup_count: how many old to keep, 0 = all
|
||||
if not self.validate_log_level(log_level_file):
|
||||
log_level_file = self.DEFAULT_LOG_LEVEL_FILE
|
||||
file_handler = logging.handlers.TimedRotatingFileHandler(
|
||||
filename=log_path,
|
||||
encoding="utf-8",
|
||||
when="D",
|
||||
interval=1
|
||||
when=when,
|
||||
interval=interval,
|
||||
backupCount=backup_count
|
||||
)
|
||||
formatter_file_handler = logging.Formatter(
|
||||
(
|
||||
@@ -240,10 +302,12 @@ class Log:
|
||||
),
|
||||
datefmt="%Y-%m-%dT%H:%M:%S",
|
||||
)
|
||||
file_handler.setLevel(log_level_file)
|
||||
file_handler.set_name('file_timed_rotate')
|
||||
file_handler.setLevel(log_level_file.name)
|
||||
file_handler.setFormatter(formatter_file_handler)
|
||||
return file_handler
|
||||
|
||||
# MARK: init listener
|
||||
def __init_listener(self, log_queue: 'Queue[str] | None' = None):
|
||||
"""
|
||||
If we have a Queue option start the logging queue
|
||||
@@ -256,11 +320,12 @@ class Log:
|
||||
self.log_queue = log_queue
|
||||
self.listener = logging.handlers.QueueListener(
|
||||
self.log_queue,
|
||||
*self.handlers,
|
||||
*self.handlers.values(),
|
||||
respect_handler_level=True
|
||||
)
|
||||
self.listener.start()
|
||||
|
||||
# MARK: init main log
|
||||
def __init_log(self, log_name: str) -> None:
|
||||
"""
|
||||
Initialize the main loggger
|
||||
@@ -272,14 +337,18 @@ class Log:
|
||||
self.logger = logging.getLogger(log_name)
|
||||
# add all the handlers
|
||||
if queue_handler is None:
|
||||
for handler in self.handlers:
|
||||
for handler in self.handlers.values():
|
||||
self.logger.addHandler(handler)
|
||||
else:
|
||||
self.logger.addHandler(queue_handler)
|
||||
# set maximum logging level for all logging output
|
||||
# log level filtering is done per handler
|
||||
self.logger.setLevel(logging.DEBUG)
|
||||
# short name
|
||||
self.lg = self.logger
|
||||
self.l = self.logger
|
||||
|
||||
# MARK: init logger for Fork/Thread
|
||||
@staticmethod
|
||||
def init_worker_logging(log_queue: 'Queue[str]') -> logging.Logger:
|
||||
"""
|
||||
@@ -288,6 +357,7 @@ class Log:
|
||||
queue_handler = logging.handlers.QueueHandler(log_queue)
|
||||
# getLogger call MUST be WITHOUT and logger name
|
||||
root_logger = logging.getLogger()
|
||||
# base logging level, filtering is done in the handlers
|
||||
root_logger.setLevel(logging.DEBUG)
|
||||
root_logger.handlers.clear()
|
||||
root_logger.addHandler(queue_handler)
|
||||
@@ -297,13 +367,110 @@ class Log:
|
||||
|
||||
return root_logger
|
||||
|
||||
def stop_listener(self):
|
||||
"""
|
||||
stop the listener
|
||||
"""
|
||||
if self.listener is not None:
|
||||
self.listener.stop()
|
||||
# FIXME: we need to add a custom formater to add stack level listing if we want to
|
||||
# Important note, although they exist, it is recommended to use self.logger.NAME directly
|
||||
# so that the correct filename, method and row number is set
|
||||
# for > 50 use logger.log(LoggingLevel.<LEVEL>.value, ...)
|
||||
# for exception logger.log(LoggingLevel.EXCEPTION.value, ..., execInfo=True)
|
||||
# MARK: log message
|
||||
def log(self, level: int, msg: object, *args: object, extra: MutableMapping[str, object] | None = None):
|
||||
"""log general"""
|
||||
if not hasattr(self, 'logger'):
|
||||
raise ValueError('Logger is not yet initialized')
|
||||
if extra is None:
|
||||
extra = {}
|
||||
extra['stack_trace'] = traceback_call_str(start=3)
|
||||
self.logger.log(level, msg, *args, extra=extra, stacklevel=2)
|
||||
|
||||
# MARK: DEBUG 10
|
||||
def debug(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
||||
"""debug"""
|
||||
if not hasattr(self, 'logger'):
|
||||
raise ValueError('Logger is not yet initialized')
|
||||
if extra is None:
|
||||
extra = {}
|
||||
extra['stack_trace'] = traceback_call_str(start=3)
|
||||
self.logger.debug(msg, *args, extra=extra, stacklevel=2)
|
||||
|
||||
# MARK: INFO 20
|
||||
def info(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
||||
"""info"""
|
||||
if not hasattr(self, 'logger'):
|
||||
raise ValueError('Logger is not yet initialized')
|
||||
if extra is None:
|
||||
extra = {}
|
||||
extra['stack_trace'] = traceback_call_str(start=3)
|
||||
self.logger.info(msg, *args, extra=extra, stacklevel=2)
|
||||
|
||||
# MARK: WARNING 30
|
||||
def warning(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
||||
"""warning"""
|
||||
if not hasattr(self, 'logger'):
|
||||
raise ValueError('Logger is not yet initialized')
|
||||
if extra is None:
|
||||
extra = {}
|
||||
extra['stack_trace'] = traceback_call_str(start=3)
|
||||
self.logger.warning(msg, *args, extra=extra, stacklevel=2)
|
||||
|
||||
# MARK: ERROR 40
|
||||
def error(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
||||
"""error"""
|
||||
if not hasattr(self, 'logger'):
|
||||
raise ValueError('Logger is not yet initialized')
|
||||
if extra is None:
|
||||
extra = {}
|
||||
extra['stack_trace'] = traceback_call_str(start=3)
|
||||
self.logger.error(msg, *args, extra=extra, stacklevel=2)
|
||||
|
||||
# MARK: CRITICAL 50
|
||||
def critical(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
||||
"""critcal"""
|
||||
if not hasattr(self, 'logger'):
|
||||
raise ValueError('Logger is not yet initialized')
|
||||
if extra is None:
|
||||
extra = {}
|
||||
extra['stack_trace'] = traceback_call_str(start=3)
|
||||
self.logger.critical(msg, *args, extra=extra, stacklevel=2)
|
||||
|
||||
# MARK: ALERT 55
|
||||
def alert(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
||||
"""alert"""
|
||||
if not hasattr(self, 'logger'):
|
||||
raise ValueError('Logger is not yet initialized')
|
||||
# extra_dict = dict(extra)
|
||||
if extra is None:
|
||||
extra = {}
|
||||
extra['stack_trace'] = traceback_call_str(start=3)
|
||||
self.logger.log(LoggingLevel.ALERT.value, msg, *args, extra=extra, stacklevel=2)
|
||||
|
||||
# MARK: EMERGECNY: 60
|
||||
def emergency(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
||||
"""emergency"""
|
||||
if not hasattr(self, 'logger'):
|
||||
raise ValueError('Logger is not yet initialized')
|
||||
if extra is None:
|
||||
extra = {}
|
||||
extra['stack_trace'] = traceback_call_str(start=3)
|
||||
self.logger.log(LoggingLevel.EMERGENCY.value, msg, *args, extra=extra, stacklevel=2)
|
||||
|
||||
# MARK: EXCEPTION: 70
|
||||
def exception(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
||||
"""
|
||||
log on exceotion level, this is log.exception, but logs with a new level
|
||||
|
||||
Args:
|
||||
msg (object): _description_
|
||||
*args (object): arguments for msg
|
||||
extra: Mapping[str, object] | None: extra arguments for the formatting if needed
|
||||
"""
|
||||
if not hasattr(self, 'logger'):
|
||||
raise ValueError('Logger is not yet initialized')
|
||||
if extra is None:
|
||||
extra = {}
|
||||
extra['stack_trace'] = traceback_call_str(start=3)
|
||||
self.logger.log(LoggingLevel.EXCEPTION.value, msg, *args, exc_info=True, extra=extra, stacklevel=2)
|
||||
|
||||
# MARK: break line
|
||||
def break_line(self, info: str = "BREAK"):
|
||||
"""
|
||||
add a break line as info level
|
||||
@@ -311,22 +478,100 @@ class Log:
|
||||
Keyword Arguments:
|
||||
info {str} -- _description_ (default: {"BREAK"})
|
||||
"""
|
||||
if not hasattr(self, 'logger'):
|
||||
raise ValueError('Logger is not yet initialized')
|
||||
self.logger.info("[%s] %s>", info, self.SPACER_CHAR * self.SPACER_LENGTH)
|
||||
|
||||
def exception(self, msg: object, *args: object, extra: Mapping[str, object] | None = None) -> None:
|
||||
# MARK: queue handling
|
||||
def flush(self, handler_name: str | None = None, timeout: float = 2.0) -> bool:
|
||||
"""
|
||||
log on exceotion level
|
||||
Flush all pending messages
|
||||
|
||||
Args:
|
||||
msg (object): _description_
|
||||
*args (object): arguments for msg
|
||||
extra: Mapping[str, object] | None: extra arguments for the formatting if needed
|
||||
"""
|
||||
self.logger.log(Log.EXCEPTION, msg, *args, exc_info=True, extra=extra)
|
||||
Keyword Arguments:
|
||||
handler_name {str | None} -- _description_ (default: {None})
|
||||
timeout {float} -- _description_ (default: {2.0})
|
||||
|
||||
def validate_log_level(self, log_level: Any) -> bool:
|
||||
Returns:
|
||||
bool -- _description_
|
||||
"""
|
||||
if the log level is invalid, will erturn false
|
||||
if not self.listener or not self.log_queue:
|
||||
return False
|
||||
|
||||
try:
|
||||
# Wait for queue to be processed
|
||||
start_time = time.time()
|
||||
while not self.log_queue.empty() and (time.time() - start_time) < timeout:
|
||||
time.sleep(0.01)
|
||||
|
||||
# Flush all handlers or handler given
|
||||
if handler_name:
|
||||
try:
|
||||
self.handlers[handler_name].flush()
|
||||
except IndexError:
|
||||
pass
|
||||
else:
|
||||
for handler in self.handlers.values():
|
||||
handler.flush()
|
||||
except OSError:
|
||||
return False
|
||||
return True
|
||||
|
||||
def stop_listener(self):
|
||||
"""
|
||||
stop the listener
|
||||
"""
|
||||
if self.listener is not None:
|
||||
self.flush()
|
||||
self.listener.stop()
|
||||
|
||||
# MARK: log level handling
|
||||
def set_log_level(self, handler_name: str, log_level: LoggingLevel) -> bool:
|
||||
"""
|
||||
set the logging level for a handler
|
||||
|
||||
Arguments:
|
||||
handler {str} -- _description_
|
||||
log_level {LoggingLevel} -- _description_
|
||||
|
||||
Returns:
|
||||
bool -- _description_
|
||||
"""
|
||||
try:
|
||||
# flush queue befoe changing logging level
|
||||
self.flush(handler_name)
|
||||
self.handlers[handler_name].setLevel(log_level.name)
|
||||
return True
|
||||
except IndexError:
|
||||
if self.logger:
|
||||
self.logger.error('Handler %s not found, cannot change log level', handler_name)
|
||||
return False
|
||||
except AttributeError:
|
||||
if self.logger:
|
||||
self.logger.error(
|
||||
'Cannot change to log level %s for handler %s, log level invalid',
|
||||
LoggingLevel.name, handler_name
|
||||
)
|
||||
return False
|
||||
|
||||
def get_log_level(self, handler_name: str) -> LoggingLevel:
|
||||
"""
|
||||
gettthe logging level for a handler
|
||||
|
||||
Arguments:
|
||||
handler_name {str} -- _description_
|
||||
|
||||
Returns:
|
||||
LoggingLevel -- _description_
|
||||
"""
|
||||
try:
|
||||
return self.handlers[handler_name]
|
||||
except IndexError:
|
||||
return LoggingLevel.NOTSET
|
||||
|
||||
@staticmethod
|
||||
def validate_log_level(log_level: Any) -> bool:
|
||||
"""
|
||||
if the log level is invalid will return false, else return true
|
||||
|
||||
Args:
|
||||
log_level (Any): _description_
|
||||
@@ -334,11 +579,27 @@ class Log:
|
||||
Returns:
|
||||
bool: _description_
|
||||
"""
|
||||
if isinstance(log_level, int):
|
||||
return any(getattr(logging, attr) == log_level for attr in dir(logging))
|
||||
elif isinstance(log_level, str):
|
||||
return isinstance(getattr(logging, log_level.upper(), None), int)
|
||||
else:
|
||||
try:
|
||||
_ = LoggingLevel.from_any(log_level).value
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def get_log_level_int(log_level: Any) -> int:
|
||||
"""
|
||||
Return log level as INT
|
||||
If invalid returns the default log level
|
||||
|
||||
Arguments:
|
||||
log_level {Any} -- _description_
|
||||
|
||||
Returns:
|
||||
int -- _description_
|
||||
"""
|
||||
try:
|
||||
return LoggingLevel.from_any(log_level).value
|
||||
except ValueError:
|
||||
return LoggingLevel.from_string(Log.DEFAULT_LOG_LEVEL.name).value
|
||||
|
||||
# __END__
|
||||
|
||||
@@ -0,0 +1,90 @@
|
||||
"""
|
||||
All logging levels
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class LoggingLevel(Enum):
|
||||
"""
|
||||
Log class levels
|
||||
"""
|
||||
NOTSET = logging.NOTSET # 0
|
||||
DEBUG = logging.DEBUG # 10
|
||||
INFO = logging.INFO # 20
|
||||
WARNING = logging.WARNING # 30
|
||||
ERROR = logging.ERROR # 40
|
||||
CRITICAL = logging.CRITICAL # 50
|
||||
ALERT = 55 # 55 (for Sys log)
|
||||
EMERGENCY = 60 # 60 (for Sys log)
|
||||
EXCEPTION = 70 # 70 (manualy set, error but with higher level)
|
||||
# Alternative names
|
||||
WARN = logging.WARN # 30 (alias for WARNING)
|
||||
FATAL = logging.FATAL # 50 (alias for CRITICAL)
|
||||
|
||||
# Optional: Add string representation for better readability
|
||||
@classmethod
|
||||
def from_string(cls, level_str: str):
|
||||
"""Convert string to LogLevel enum"""
|
||||
try:
|
||||
return cls[level_str.upper()]
|
||||
except KeyError as e:
|
||||
raise ValueError(f"Invalid log level: {level_str}") from e
|
||||
except AttributeError as e:
|
||||
raise ValueError(f"Invalid log level: {level_str}") from e
|
||||
|
||||
@classmethod
|
||||
def from_int(cls, level_int: int):
|
||||
"""Convert integer to LogLevel enum"""
|
||||
try:
|
||||
return cls(level_int)
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid log level: {level_int}") from e
|
||||
|
||||
@classmethod
|
||||
def from_any(cls, level_any: Any):
|
||||
"""
|
||||
Convert any vale
|
||||
if self LoggingLevel return as is, else try to convert from int or string
|
||||
|
||||
Arguments:
|
||||
level_any {Any} -- _description_
|
||||
|
||||
Returns:
|
||||
_type_ -- _description_
|
||||
"""
|
||||
if isinstance(level_any, LoggingLevel):
|
||||
return level_any
|
||||
if isinstance(level_any, int):
|
||||
return cls.from_int(level_any)
|
||||
return cls.from_string(level_any)
|
||||
|
||||
def to_logging_level(self):
|
||||
"""Convert to logging module level"""
|
||||
return self.value
|
||||
|
||||
def to_lower_case(self):
|
||||
"""return loser case"""
|
||||
return self.name.lower()
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
def includes(self, level: 'LoggingLevel'):
|
||||
"""
|
||||
if given level is included in set level
|
||||
eg: INFO set, ERROR is included in INFO because INFO level would print ERROR
|
||||
"""
|
||||
return self.value <= level.value
|
||||
|
||||
def is_higher_than(self, level: 'LoggingLevel'):
|
||||
"""if given value is higher than set"""
|
||||
return self.value > level.value
|
||||
|
||||
def is_lower_than(self, level: 'LoggingLevel'):
|
||||
"""if given value is lower than set"""
|
||||
return self.value < level.value
|
||||
|
||||
# __END__
|
||||
0
src/corelibs/var_handling/__init__.py
Normal file
0
src/corelibs/var_handling/__init__.py
Normal file
65
src/corelibs/var_handling/var_helpers.py
Normal file
65
src/corelibs/var_handling/var_helpers.py
Normal file
@@ -0,0 +1,65 @@
|
||||
"""
|
||||
variable convert, check, etc helepr
|
||||
"""
|
||||
|
||||
from typing import Any
|
||||
|
||||
|
||||
def is_int(string: Any) -> bool:
|
||||
"""
|
||||
check if a value is int
|
||||
|
||||
Arguments:
|
||||
string {Any} -- _description_
|
||||
|
||||
Returns:
|
||||
bool -- _description_
|
||||
"""
|
||||
try:
|
||||
int(string)
|
||||
return True
|
||||
except TypeError:
|
||||
return False
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
def is_float(string: Any) -> bool:
|
||||
"""
|
||||
check if a value is float
|
||||
|
||||
Arguments:
|
||||
string {Any} -- _description_
|
||||
|
||||
Returns:
|
||||
bool -- _description_
|
||||
"""
|
||||
try:
|
||||
float(string)
|
||||
return True
|
||||
except TypeError:
|
||||
return False
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
def str_to_bool(string: str):
|
||||
"""
|
||||
convert string to bool
|
||||
|
||||
Arguments:
|
||||
s {str} -- _description_
|
||||
|
||||
Raises:
|
||||
ValueError: _description_
|
||||
|
||||
Returns:
|
||||
_type_ -- _description_
|
||||
"""
|
||||
if string == "True" or string == "true":
|
||||
return True
|
||||
if string == "False" or string == "false":
|
||||
return False
|
||||
raise ValueError(f"Invalid boolean string: {string}")
|
||||
|
||||
# __END__
|
||||
29
test-run/config_handling/config/settings.ini
Normal file
29
test-run/config_handling/config/settings.ini
Normal file
@@ -0,0 +1,29 @@
|
||||
[TestA]
|
||||
foo=bar
|
||||
foobar=1
|
||||
bar=st
|
||||
some_match=foo
|
||||
some_match_list=foo,bar
|
||||
test_list=a,b,c,d f, g h
|
||||
other_list=a|b|c|d|
|
||||
third_list=xy|ab|df|fg
|
||||
str_length=foobar
|
||||
int_range=20
|
||||
int_range_not_set=
|
||||
int_range_not_set_empty_set=5
|
||||
#
|
||||
match_target=foo
|
||||
match_target_list=foo,bar,baz
|
||||
#
|
||||
match_source_a=foo
|
||||
match_source_b=foo
|
||||
; match_source_c=foo
|
||||
match_source_list=foo,bar
|
||||
|
||||
[TestB]
|
||||
element_a=Static energy
|
||||
element_b=123.5
|
||||
element_c=True
|
||||
email=foo@bar.com,other+bar-fee@domain-com.cp,
|
||||
email_not_mandatory=
|
||||
email_bad=gii@bar.com
|
||||
2
test-run/config_handling/log/.gitignore
vendored
Normal file
2
test-run/config_handling/log/.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
*
|
||||
!.gitignore
|
||||
118
test-run/config_handling/settings_loader.py
Normal file
118
test-run/config_handling/settings_loader.py
Normal file
@@ -0,0 +1,118 @@
|
||||
"""
|
||||
Settings loader test
|
||||
"""
|
||||
|
||||
import re
|
||||
from pathlib import Path
|
||||
from corelibs.debug_handling.dump_data import dump_data
|
||||
from corelibs.logging_handling.log import Log
|
||||
from corelibs.config_handling.settings_loader import SettingsLoader
|
||||
from corelibs.config_handling.settings_loader_handling.settings_loader_check import SettingsLoaderCheck
|
||||
|
||||
SCRIPT_PATH: Path = Path(__file__).resolve().parent
|
||||
ROOT_PATH: Path = SCRIPT_PATH
|
||||
CONFIG_DIR: Path = Path("config")
|
||||
CONFIG_FILE: str = "settings.ini"
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
Main run
|
||||
"""
|
||||
|
||||
value = "2025/1/1"
|
||||
regex_c = re.compile(SettingsLoaderCheck.CHECK_SETTINGS['string.date']['regex'], re.VERBOSE)
|
||||
result = regex_c.search(value)
|
||||
print(f"regex {regex_c} check against {value} -> {result}")
|
||||
|
||||
# for log testing
|
||||
script_path: Path = Path(__file__).resolve().parent
|
||||
log = Log(
|
||||
log_path=script_path.joinpath('log', 'settings_loader.log'),
|
||||
log_name="Settings Loader",
|
||||
log_settings={
|
||||
"log_level_console": 'DEBUG',
|
||||
"log_level_file": 'DEBUG',
|
||||
}
|
||||
)
|
||||
log.logger.info('Settings loader')
|
||||
|
||||
sl = SettingsLoader(
|
||||
{
|
||||
'foo': 'OVERLOAD'
|
||||
},
|
||||
ROOT_PATH.joinpath(CONFIG_DIR, CONFIG_FILE),
|
||||
log=log
|
||||
)
|
||||
try:
|
||||
config_load = 'TestA'
|
||||
config_data = sl.load_settings(
|
||||
config_load,
|
||||
{
|
||||
# "doesnt": ["split:,"],
|
||||
"foo": ["mandatory:yes"],
|
||||
"foobar": ["check:int"],
|
||||
"bar": ["mandatory:yes"],
|
||||
"some_match": ["matching:foo|bar"],
|
||||
"some_match_list": ["split:,", "matching:foo|bar"],
|
||||
"test_list": [
|
||||
"check:string.alphanumeric",
|
||||
"split:,"
|
||||
],
|
||||
"other_list": ["split:|"],
|
||||
"third_list": [
|
||||
"split:|",
|
||||
"check:string.alphanumeric"
|
||||
],
|
||||
"str_length": [
|
||||
"length:2-10"
|
||||
],
|
||||
"int_range": [
|
||||
"range:2-50"
|
||||
],
|
||||
"int_range_not_set": [
|
||||
"range:2-50"
|
||||
],
|
||||
"int_range_not_set_empty_set": [
|
||||
"empty:"
|
||||
],
|
||||
"match_target": ["matching:foo"],
|
||||
"match_target_list": ["split:,", "matching:foo|bar|baz",],
|
||||
"match_source_a": ["in:match_target"],
|
||||
"match_source_b": ["in:match_target_list"],
|
||||
"match_source_list": ["split:,", "in:match_target_list"],
|
||||
}
|
||||
)
|
||||
print(f"[{config_load}] Load: {config_load} -> {dump_data(config_data)}")
|
||||
except ValueError as e:
|
||||
print(f"Could not load settings: {e}")
|
||||
|
||||
try:
|
||||
config_load = 'TestB'
|
||||
config_data = sl.load_settings(
|
||||
config_load,
|
||||
{
|
||||
"email": [
|
||||
"split:,",
|
||||
"mandatory:yes",
|
||||
"check:string.email.basic"
|
||||
],
|
||||
"email_not_mandatory": [
|
||||
"split:,",
|
||||
# "mandatory:yes",
|
||||
"check:string.email.basic"
|
||||
],
|
||||
"email_bad": [
|
||||
"split:,",
|
||||
"mandatory:yes",
|
||||
"check:string.email.basic"
|
||||
]
|
||||
}
|
||||
)
|
||||
print(f"[{config_load}] Load: {config_load} -> {dump_data(config_data)}")
|
||||
except ValueError as e:
|
||||
print(f"Could not load settings: {e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
29
test-run/iterator_handling/list_helpers.py
Normal file
29
test-run/iterator_handling/list_helpers.py
Normal file
@@ -0,0 +1,29 @@
|
||||
"""
|
||||
test list helpers
|
||||
"""
|
||||
|
||||
from corelibs.iterator_handling.list_helpers import is_list_in_list, convert_to_list
|
||||
|
||||
|
||||
def __test_is_list_in_list_a():
|
||||
list_a = [1, "hello", 3.14, True, "world"]
|
||||
list_b = ["hello", True, 42]
|
||||
result = is_list_in_list(list_a, list_b)
|
||||
print(f"RESULT: {result}")
|
||||
|
||||
|
||||
def __convert_list():
|
||||
source = "hello"
|
||||
result = convert_to_list(source)
|
||||
print(f"IN: {source} -> {result}")
|
||||
|
||||
|
||||
def main():
|
||||
__test_is_list_in_list_a()
|
||||
__convert_list()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
# __END__
|
||||
@@ -6,6 +6,7 @@ Log logging_handling.log testing
|
||||
from pathlib import Path
|
||||
# this is for testing only
|
||||
from corelibs.logging_handling.log import Log
|
||||
from corelibs.logging_handling.logging_level_handling.logging_level import LoggingLevel
|
||||
|
||||
|
||||
def main():
|
||||
@@ -17,18 +18,75 @@ def main():
|
||||
log_path=script_path.joinpath('log', 'test.log'),
|
||||
log_name="Test Log",
|
||||
log_settings={
|
||||
"log_level_console": 'WARNING',
|
||||
# "log_level_console": 'DEBUG',
|
||||
"log_level_console": None,
|
||||
"log_level_file": 'DEBUG',
|
||||
# "console_color_output_enabled": False,
|
||||
}
|
||||
)
|
||||
|
||||
log.logger.debug('[NORMAL] Debug test: %s', log.logger.name)
|
||||
log.lg.debug('[NORMAL] Debug test: %s', log.logger.name)
|
||||
log.debug('[NORMAL-] Debug test: %s', log.logger.name)
|
||||
log.logger.info('[NORMAL] Info test: %s', log.logger.name)
|
||||
log.info('[NORMAL-] Info test: %s', log.logger.name)
|
||||
log.logger.warning('[NORMAL] Warning test: %s', log.logger.name)
|
||||
log.warning('[NORMAL-] Warning test: %s', log.logger.name)
|
||||
log.logger.error('[NORMAL] Error test: %s', log.logger.name)
|
||||
log.error('[NORMAL-] Error test: %s', log.logger.name)
|
||||
log.logger.critical('[NORMAL] Critical test: %s', log.logger.name)
|
||||
log.critical('[NORMAL-] Critical test: %s', log.logger.name)
|
||||
log.logger.log(LoggingLevel.ALERT.value, '[NORMAL] alert test: %s', log.logger.name)
|
||||
log.alert('[NORMAL-] alert test: %s', log.logger.name)
|
||||
log.emergency('[NORMAL-] emergency test: %s', log.logger.name)
|
||||
log.logger.log(LoggingLevel.EMERGENCY.value, '[NORMAL] emergency test: %s', log.logger.name)
|
||||
log.exception('[NORMAL] Exception test: %s', log.logger.name)
|
||||
log.logger.log(LoggingLevel.EXCEPTION.value, '[NORMAL] exception test: %s', log.logger.name, exc_info=True)
|
||||
|
||||
bad_level = 'WRONG'
|
||||
if not Log.validate_log_level(bad_level):
|
||||
print(f"Invalid level: {bad_level}")
|
||||
good_level = 'WARNING'
|
||||
if Log.validate_log_level(good_level):
|
||||
print(f"Valid level: {good_level}")
|
||||
|
||||
print(f"ERROR is to_logging_level(): {LoggingLevel.ERROR.to_logging_level()}")
|
||||
print(f"ERROR is to_lower_case(): {LoggingLevel.ERROR.to_lower_case()}")
|
||||
print(f"ERROR is: {LoggingLevel.ERROR}")
|
||||
print(f"ERROR is value: {LoggingLevel.ERROR.value}")
|
||||
print(f"ERROR is name: {LoggingLevel.ERROR.name}")
|
||||
print(f"ERROR is from_string(lower): {LoggingLevel.from_string('ERROR')}")
|
||||
print(f"ERROR is from_string(upper): {LoggingLevel.from_string('ERROR')}")
|
||||
print(f"ERROR is from_int: {LoggingLevel.from_int(40)}")
|
||||
print(f"ERROR is from_any(text lower): {LoggingLevel.from_any('ERROR')}")
|
||||
print(f"ERROR is from_any(text upper): {LoggingLevel.from_any('ERROR')}")
|
||||
print(f"ERROR is from_any(int): {LoggingLevel.from_any(40)}")
|
||||
print(f"INFO <= ERROR: {LoggingLevel.INFO.includes(LoggingLevel.ERROR)}")
|
||||
print(f"INFO > ERROR: {LoggingLevel.INFO.is_higher_than(LoggingLevel.ERROR)}")
|
||||
print(f"INFO < ERROR: {LoggingLevel.INFO.is_lower_than(LoggingLevel.ERROR)}")
|
||||
print(f"INFO < ERROR: {LoggingLevel.INFO.is_lower_than(LoggingLevel.ERROR)}")
|
||||
|
||||
try:
|
||||
print(f"INVALID is A: {LoggingLevel.from_string('INVALID')}")
|
||||
except ValueError as e:
|
||||
print(f"* ERROR: {e}")
|
||||
|
||||
try:
|
||||
__test = 5 / 0
|
||||
print(f"Divied: {__test}")
|
||||
except ZeroDivisionError as e:
|
||||
log.logger.critical("Divison through zero: %s", e)
|
||||
log.exception("Divison through zero")
|
||||
|
||||
for handler in log.logger.handlers:
|
||||
print(f"Handler (logger) {handler} -> {handler.level} -> {LoggingLevel.from_any(handler.level)}")
|
||||
|
||||
for key, handler in log.handlers.items():
|
||||
print(f"Handler (handlers) [{key}] {handler} -> {handler.level} -> {LoggingLevel.from_any(handler.level)}")
|
||||
log.set_log_level('stream_handler', LoggingLevel.ERROR)
|
||||
log.logger.warning('[NORMAL] Invisible Warning test: %s', log.logger.name)
|
||||
log.logger.error('[NORMAL] Visible Error test: %s', log.logger.name)
|
||||
# log.handlers['stream_handler'].se
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -10,6 +10,7 @@ import concurrent.futures
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from corelibs.logging_handling.log import Log
|
||||
from corelibs.logging_handling.logging_level_handling.logging_level import LoggingLevel
|
||||
|
||||
|
||||
def work_function(log_name: str, worker_id: int, data: list[int]) -> int:
|
||||
@@ -46,7 +47,8 @@ def main():
|
||||
"log_queue": log_queue,
|
||||
}
|
||||
)
|
||||
log.logger.info('Pool Fork logging test')
|
||||
|
||||
log.logger.debug('Pool Fork logging test')
|
||||
max_forks = 2
|
||||
data_sets = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
|
||||
with concurrent.futures.ProcessPoolExecutor(
|
||||
@@ -62,9 +64,23 @@ def main():
|
||||
log.logger.info('Workders started')
|
||||
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
log.logger.info('Processing result: %s', future.result())
|
||||
log.logger.warning('Processing result: %s', future.result())
|
||||
print(f"Processing result: {future.result()}")
|
||||
|
||||
log.set_log_level('stream_handler', LoggingLevel.ERROR)
|
||||
log.logger.error('SECOND Start workers')
|
||||
futures = [
|
||||
executor.submit(work_function, log.log_name, worker_id, data)
|
||||
for worker_id, data in enumerate(data_sets, 1)
|
||||
]
|
||||
log.logger.info('[INVISIBLE] Workders started')
|
||||
log.logger.error('[VISIBLE] Second Workders started')
|
||||
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
log.logger.error('Processing result: %s', future.result())
|
||||
print(f"Processing result: {future.result()}")
|
||||
|
||||
log.set_log_level('stream_handler', LoggingLevel.DEBUG)
|
||||
log.logger.info('[END] Queue logger test')
|
||||
log.stop_listener()
|
||||
|
||||
|
||||
@@ -5,8 +5,10 @@ Log logging_handling.log testing
|
||||
# import atexit
|
||||
from pathlib import Path
|
||||
from multiprocessing import Queue
|
||||
import time
|
||||
# this is for testing only
|
||||
from corelibs.logging_handling.log import Log
|
||||
from corelibs.logging_handling.logging_level_handling.logging_level import LoggingLevel
|
||||
|
||||
|
||||
def main():
|
||||
@@ -32,7 +34,29 @@ def main():
|
||||
log_q.logger.warning('[QUEUE] Warning test: %s', log_q.logger.name)
|
||||
log_q.logger.error('[QUEUE] Error test: %s', log_q.logger.name)
|
||||
log_q.logger.critical('[QUEUE] Critical test: %s', log_q.logger.name)
|
||||
log_q.exception('[QUEUE] Exception test: %s', log_q.logger.name)
|
||||
log_q.logger.log(LoggingLevel.EXCEPTION.value, '[QUEUE] Exception test: %s', log_q.logger.name, exc_info=True)
|
||||
time.sleep(0.1)
|
||||
|
||||
for handler in log_q.logger.handlers:
|
||||
print(f"[1] Handler (logger) {handler}")
|
||||
if log_q.listener is not None:
|
||||
for handler in log_q.listener.handlers:
|
||||
print(f"[1] Handler (queue) {handler}")
|
||||
for handler in log_q.handlers.items():
|
||||
print(f"[1] Handler (handlers) {handler}")
|
||||
|
||||
log_q.set_log_level('stream_handler', LoggingLevel.ERROR)
|
||||
log_q.logger.warning('[QUEUE-B] [INVISIBLE] Warning test: %s', log_q.logger.name)
|
||||
log_q.logger.error('[QUEUE-B] [VISIBLE] Error test: %s', log_q.logger.name)
|
||||
|
||||
for handler in log_q.logger.handlers:
|
||||
print(f"[2] Handler (logger) {handler}")
|
||||
if log_q.listener is not None:
|
||||
for handler in log_q.listener.handlers:
|
||||
print(f"[2] Handler (queue) {handler}")
|
||||
for handler in log_q.handlers.items():
|
||||
print(f"[2] Handler (handlers) {handler}")
|
||||
|
||||
log_q.stop_listener()
|
||||
|
||||
|
||||
|
||||
@@ -52,7 +52,7 @@ def __sh_format_number():
|
||||
print(f"Format {number} ({precision}) -> {result}")
|
||||
|
||||
|
||||
def _sh_colors():
|
||||
def __sh_colors():
|
||||
for color in [
|
||||
"black",
|
||||
"red",
|
||||
@@ -79,7 +79,7 @@ def main():
|
||||
"""
|
||||
__sh_shorten_string()
|
||||
__sh_format_number()
|
||||
_sh_colors()
|
||||
__sh_colors()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
0
tests/integration/__init__.py
Normal file
0
tests/integration/__init__.py
Normal file
300
tests/unit/iterator_handling/test_list_helpers.py
Normal file
300
tests/unit/iterator_handling/test_list_helpers.py
Normal file
@@ -0,0 +1,300 @@
|
||||
"""
|
||||
iterator_handling.list_helepr tests
|
||||
"""
|
||||
|
||||
from typing import Any
|
||||
import pytest
|
||||
from corelibs.iterator_handling.list_helpers import convert_to_list, is_list_in_list
|
||||
|
||||
|
||||
class TestConvertToList:
|
||||
"""Test cases for convert_to_list function"""
|
||||
|
||||
def test_string_input(self):
|
||||
"""Test with string inputs"""
|
||||
assert convert_to_list("hello") == ["hello"]
|
||||
assert convert_to_list("") == [""]
|
||||
assert convert_to_list("123") == ["123"]
|
||||
assert convert_to_list("true") == ["true"]
|
||||
|
||||
def test_integer_input(self):
|
||||
"""Test with integer inputs"""
|
||||
assert convert_to_list(42) == [42]
|
||||
assert convert_to_list(0) == [0]
|
||||
assert convert_to_list(-10) == [-10]
|
||||
assert convert_to_list(999999) == [999999]
|
||||
|
||||
def test_float_input(self):
|
||||
"""Test with float inputs"""
|
||||
assert convert_to_list(3.14) == [3.14]
|
||||
assert convert_to_list(0.0) == [0.0]
|
||||
assert convert_to_list(-2.5) == [-2.5]
|
||||
assert convert_to_list(1.0) == [1.0]
|
||||
|
||||
def test_boolean_input(self):
|
||||
"""Test with boolean inputs"""
|
||||
assert convert_to_list(True) == [True]
|
||||
assert convert_to_list(False) == [False]
|
||||
|
||||
def test_list_input_unchanged(self):
|
||||
"""Test that list inputs are returned unchanged"""
|
||||
# String lists
|
||||
str_list = ["a", "b", "c"]
|
||||
assert convert_to_list(str_list) == str_list
|
||||
assert convert_to_list(str_list) is str_list # Same object reference
|
||||
|
||||
# Integer lists
|
||||
int_list = [1, 2, 3]
|
||||
assert convert_to_list(int_list) == int_list
|
||||
assert convert_to_list(int_list) is int_list
|
||||
|
||||
# Float lists
|
||||
float_list = [1.1, 2.2, 3.3]
|
||||
assert convert_to_list(float_list) == float_list
|
||||
assert convert_to_list(float_list) is float_list
|
||||
|
||||
# Boolean lists
|
||||
bool_list = [True, False, True]
|
||||
assert convert_to_list(bool_list) == bool_list
|
||||
assert convert_to_list(bool_list) is bool_list
|
||||
|
||||
# Mixed lists
|
||||
mixed_list = [1, "hello", 3.14, True]
|
||||
assert convert_to_list(mixed_list) == mixed_list
|
||||
assert convert_to_list(mixed_list) is mixed_list
|
||||
|
||||
# Empty list
|
||||
empty_list: list[int] = []
|
||||
assert convert_to_list(empty_list) == empty_list
|
||||
assert convert_to_list(empty_list) is empty_list
|
||||
|
||||
def test_nested_lists(self):
|
||||
"""Test with nested lists (should still return the same list)"""
|
||||
nested_list: list[list[int]] = [[1, 2], [3, 4]]
|
||||
assert convert_to_list(nested_list) == nested_list
|
||||
assert convert_to_list(nested_list) is nested_list
|
||||
|
||||
def test_single_element_lists(self):
|
||||
"""Test with single element lists"""
|
||||
single_str = ["hello"]
|
||||
assert convert_to_list(single_str) == single_str
|
||||
assert convert_to_list(single_str) is single_str
|
||||
|
||||
single_int = [42]
|
||||
assert convert_to_list(single_int) == single_int
|
||||
assert convert_to_list(single_int) is single_int
|
||||
|
||||
|
||||
class TestIsListInList:
|
||||
"""Test cases for is_list_in_list function"""
|
||||
|
||||
def test_string_lists(self):
|
||||
"""Test with string lists"""
|
||||
list_a = ["a", "b", "c", "d"]
|
||||
list_b = ["b", "d", "e"]
|
||||
result = is_list_in_list(list_a, list_b)
|
||||
assert set(result) == {"a", "c"}
|
||||
assert isinstance(result, list)
|
||||
|
||||
def test_integer_lists(self):
|
||||
"""Test with integer lists"""
|
||||
list_a = [1, 2, 3, 4, 5]
|
||||
list_b = [2, 4, 6]
|
||||
result = is_list_in_list(list_a, list_b)
|
||||
assert set(result) == {1, 3, 5}
|
||||
assert isinstance(result, list)
|
||||
|
||||
def test_float_lists(self):
|
||||
"""Test with float lists"""
|
||||
list_a = [1.1, 2.2, 3.3, 4.4]
|
||||
list_b = [2.2, 4.4, 5.5]
|
||||
result = is_list_in_list(list_a, list_b)
|
||||
assert set(result) == {1.1, 3.3}
|
||||
assert isinstance(result, list)
|
||||
|
||||
def test_boolean_lists(self):
|
||||
"""Test with boolean lists"""
|
||||
list_a = [True, False, True]
|
||||
list_b = [True]
|
||||
result = is_list_in_list(list_a, list_b)
|
||||
assert set(result) == {False}
|
||||
assert isinstance(result, list)
|
||||
|
||||
def test_mixed_type_lists(self):
|
||||
"""Test with mixed type lists"""
|
||||
list_a = [1, "hello", 3.14, True, "world"]
|
||||
list_b = ["hello", True, 42]
|
||||
result = is_list_in_list(list_a, list_b)
|
||||
assert set(result) == {1, 3.14, "world"}
|
||||
assert isinstance(result, list)
|
||||
|
||||
def test_empty_lists(self):
|
||||
"""Test with empty lists"""
|
||||
# Empty list_a
|
||||
assert is_list_in_list([], [1, 2, 3]) == []
|
||||
|
||||
# Empty list_b
|
||||
list_a = [1, 2, 3]
|
||||
result = is_list_in_list(list_a, [])
|
||||
assert set(result) == {1, 2, 3}
|
||||
|
||||
# Both empty
|
||||
assert is_list_in_list([], []) == []
|
||||
|
||||
def test_no_common_elements(self):
|
||||
"""Test when lists have no common elements"""
|
||||
list_a = [1, 2, 3]
|
||||
list_b = [4, 5, 6]
|
||||
result = is_list_in_list(list_a, list_b)
|
||||
assert set(result) == {1, 2, 3}
|
||||
|
||||
def test_all_elements_common(self):
|
||||
"""Test when all elements in list_a are in list_b"""
|
||||
list_a = [1, 2, 3]
|
||||
list_b = [1, 2, 3, 4, 5]
|
||||
result = is_list_in_list(list_a, list_b)
|
||||
assert result == []
|
||||
|
||||
def test_identical_lists(self):
|
||||
"""Test with identical lists"""
|
||||
list_a = [1, 2, 3]
|
||||
list_b = [1, 2, 3]
|
||||
result = is_list_in_list(list_a, list_b)
|
||||
assert result == []
|
||||
|
||||
def test_duplicate_elements(self):
|
||||
"""Test with duplicate elements in lists"""
|
||||
list_a = [1, 2, 2, 3, 3, 3]
|
||||
list_b = [2, 4]
|
||||
result = is_list_in_list(list_a, list_b)
|
||||
# Should return unique elements only (set behavior)
|
||||
assert set(result) == {1, 3}
|
||||
assert isinstance(result, list)
|
||||
|
||||
def test_list_b_larger_than_list_a(self):
|
||||
"""Test when list_b is larger than list_a"""
|
||||
list_a = [1, 2]
|
||||
list_b = [2, 3, 4, 5, 6, 7, 8]
|
||||
result = is_list_in_list(list_a, list_b)
|
||||
assert set(result) == {1}
|
||||
|
||||
def test_order_independence(self):
|
||||
"""Test that order doesn't matter due to set operations"""
|
||||
list_a = [3, 1, 4, 1, 5]
|
||||
list_b = [1, 2, 6]
|
||||
result = is_list_in_list(list_a, list_b)
|
||||
assert set(result) == {3, 4, 5}
|
||||
|
||||
|
||||
# Parametrized tests for more comprehensive coverage
|
||||
class TestParametrized:
|
||||
"""Parametrized tests for better coverage"""
|
||||
|
||||
@pytest.mark.parametrize("input_value,expected", [
|
||||
("hello", ["hello"]),
|
||||
(42, [42]),
|
||||
(3.14, [3.14]),
|
||||
(True, [True]),
|
||||
(False, [False]),
|
||||
("", [""]),
|
||||
(0, [0]),
|
||||
(0.0, [0.0]),
|
||||
(-1, [-1]),
|
||||
(-2.5, [-2.5]),
|
||||
])
|
||||
def test_convert_to_list_parametrized(self, input_value: Any, expected: Any):
|
||||
"""Test convert_to_list with various single values"""
|
||||
assert convert_to_list(input_value) == expected
|
||||
|
||||
@pytest.mark.parametrize("input_list", [
|
||||
[1, 2, 3],
|
||||
["a", "b", "c"],
|
||||
[1.1, 2.2, 3.3],
|
||||
[True, False],
|
||||
[1, "hello", 3.14, True],
|
||||
[],
|
||||
[42],
|
||||
[[1, 2], [3, 4]],
|
||||
])
|
||||
def test_convert_to_list_with_lists_parametrized(self, input_list: Any):
|
||||
"""Test convert_to_list with various list inputs"""
|
||||
result = convert_to_list(input_list)
|
||||
assert result == input_list
|
||||
assert result is input_list # Same object reference
|
||||
|
||||
@pytest.mark.parametrize("list_a,list_b,expected_set", [
|
||||
([1, 2, 3], [2], {1, 3}),
|
||||
(["a", "b", "c"], ["b", "d"], {"a", "c"}),
|
||||
([1, 2, 3], [4, 5, 6], {1, 2, 3}),
|
||||
([1, 2, 3], [1, 2, 3], set()),
|
||||
([], [1, 2, 3], set()),
|
||||
([1, 2, 3], [], {1, 2, 3}),
|
||||
([True, False], [True], {False}),
|
||||
([1.1, 2.2, 3.3], [2.2], {1.1, 3.3}),
|
||||
])
|
||||
def test_is_list_in_list_parametrized(self, list_a: list[Any], list_b: list[Any], expected_set: Any):
|
||||
"""Test is_list_in_list with various input combinations"""
|
||||
result = is_list_in_list(list_a, list_b)
|
||||
assert set(result) == expected_set
|
||||
assert isinstance(result, list)
|
||||
|
||||
|
||||
# Edge cases and special scenarios
|
||||
class TestEdgeCases:
|
||||
"""Test edge cases and special scenarios"""
|
||||
|
||||
def test_convert_to_list_with_none_like_values(self):
|
||||
"""Test convert_to_list with None-like values (if function supports them)"""
|
||||
# Note: Based on type hints, None is not supported, but testing behavior
|
||||
# This test might need to be adjusted based on actual function behavior
|
||||
pass
|
||||
|
||||
def test_is_list_in_list_preserves_type_distinctions(self):
|
||||
"""Test that different types are treated as different"""
|
||||
list_a = [1, "1", 1.0, True]
|
||||
list_b = [1] # Only integer 1
|
||||
result = is_list_in_list(list_a, list_b)
|
||||
|
||||
# Note: This test depends on how Python's set handles type equality
|
||||
# 1, 1.0, and True are considered equal in sets
|
||||
# "1" is different from 1
|
||||
# expected_items = {"1"} # String "1" should remain
|
||||
assert "1" in result
|
||||
assert isinstance(result, list)
|
||||
|
||||
def test_large_lists(self):
|
||||
"""Test with large lists"""
|
||||
large_list_a = list(range(1000))
|
||||
large_list_b = list(range(500, 1500))
|
||||
result = is_list_in_list(large_list_a, large_list_b)
|
||||
expected = list(range(500)) # 0 to 499
|
||||
assert set(result) == set(expected)
|
||||
|
||||
def test_memory_efficiency(self):
|
||||
"""Test that convert_to_list doesn't create unnecessary copies"""
|
||||
original_list = [1, 2, 3, 4, 5]
|
||||
result = convert_to_list(original_list)
|
||||
|
||||
# Should be the same object, not a copy
|
||||
assert result is original_list
|
||||
|
||||
# Modifying the original should affect the result
|
||||
original_list.append(6)
|
||||
assert 6 in result
|
||||
|
||||
|
||||
# Performance tests (optional)
|
||||
class TestPerformance:
|
||||
"""Performance-related tests"""
|
||||
|
||||
def test_is_list_in_list_with_duplicates_performance(self):
|
||||
"""Test that function handles duplicates efficiently"""
|
||||
# List with many duplicates
|
||||
list_a = [1, 2, 3] * 100 # 300 elements, many duplicates
|
||||
list_b = [2] * 50 # 50 elements, all the same
|
||||
|
||||
result = is_list_in_list(list_a, list_b)
|
||||
|
||||
# Should still work correctly despite duplicates
|
||||
assert set(result) == {1, 3}
|
||||
assert isinstance(result, list)
|
||||
@@ -0,0 +1,341 @@
|
||||
"""
|
||||
logging_handling.logging_level_handling.logging_level
|
||||
"""
|
||||
|
||||
import logging
|
||||
import pytest
|
||||
# from enum import Enum
|
||||
from corelibs.logging_handling.logging_level_handling.logging_level import LoggingLevel
|
||||
|
||||
|
||||
class TestLoggingLevelEnum:
|
||||
"""Test the LoggingLevel enum values and basic functionality."""
|
||||
|
||||
def test_enum_values(self):
|
||||
"""Test that all enum values match expected logging levels."""
|
||||
assert LoggingLevel.NOTSET.value == 0
|
||||
assert LoggingLevel.DEBUG.value == 10
|
||||
assert LoggingLevel.INFO.value == 20
|
||||
assert LoggingLevel.WARNING.value == 30
|
||||
assert LoggingLevel.ERROR.value == 40
|
||||
assert LoggingLevel.CRITICAL.value == 50
|
||||
assert LoggingLevel.ALERT.value == 55
|
||||
assert LoggingLevel.EMERGENCY.value == 60
|
||||
assert LoggingLevel.EXCEPTION.value == 70
|
||||
assert LoggingLevel.WARN.value == 30
|
||||
assert LoggingLevel.FATAL.value == 50
|
||||
|
||||
def test_enum_corresponds_to_logging_module(self):
|
||||
"""Test that enum values correspond to logging module constants."""
|
||||
assert LoggingLevel.NOTSET.value == logging.NOTSET
|
||||
assert LoggingLevel.DEBUG.value == logging.DEBUG
|
||||
assert LoggingLevel.INFO.value == logging.INFO
|
||||
assert LoggingLevel.WARNING.value == logging.WARNING
|
||||
assert LoggingLevel.ERROR.value == logging.ERROR
|
||||
assert LoggingLevel.CRITICAL.value == logging.CRITICAL
|
||||
assert LoggingLevel.WARN.value == logging.WARN
|
||||
assert LoggingLevel.FATAL.value == logging.FATAL
|
||||
|
||||
def test_enum_aliases(self):
|
||||
"""Test that aliases point to correct values."""
|
||||
assert LoggingLevel.WARN.value == LoggingLevel.WARNING.value
|
||||
assert LoggingLevel.FATAL.value == LoggingLevel.CRITICAL.value
|
||||
|
||||
|
||||
class TestFromString:
|
||||
"""Test the from_string classmethod."""
|
||||
|
||||
def test_from_string_valid_cases(self):
|
||||
"""Test from_string with valid string inputs."""
|
||||
assert LoggingLevel.from_string("DEBUG") == LoggingLevel.DEBUG
|
||||
assert LoggingLevel.from_string("info") == LoggingLevel.INFO
|
||||
assert LoggingLevel.from_string("Warning") == LoggingLevel.WARNING
|
||||
assert LoggingLevel.from_string("ERROR") == LoggingLevel.ERROR
|
||||
assert LoggingLevel.from_string("critical") == LoggingLevel.CRITICAL
|
||||
assert LoggingLevel.from_string("EXCEPTION") == LoggingLevel.EXCEPTION
|
||||
assert LoggingLevel.from_string("warn") == LoggingLevel.WARN
|
||||
assert LoggingLevel.from_string("FATAL") == LoggingLevel.FATAL
|
||||
|
||||
def test_from_string_invalid_cases(self):
|
||||
"""Test from_string with invalid string inputs."""
|
||||
with pytest.raises(ValueError, match="Invalid log level: invalid"):
|
||||
LoggingLevel.from_string("invalid")
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid log level: "):
|
||||
LoggingLevel.from_string("")
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid log level: 123"):
|
||||
LoggingLevel.from_string("123")
|
||||
|
||||
|
||||
class TestFromInt:
|
||||
"""Test the from_int classmethod."""
|
||||
|
||||
def test_from_int_valid_cases(self):
|
||||
"""Test from_int with valid integer inputs."""
|
||||
assert LoggingLevel.from_int(0) == LoggingLevel.NOTSET
|
||||
assert LoggingLevel.from_int(10) == LoggingLevel.DEBUG
|
||||
assert LoggingLevel.from_int(20) == LoggingLevel.INFO
|
||||
assert LoggingLevel.from_int(30) == LoggingLevel.WARNING
|
||||
assert LoggingLevel.from_int(40) == LoggingLevel.ERROR
|
||||
assert LoggingLevel.from_int(50) == LoggingLevel.CRITICAL
|
||||
assert LoggingLevel.from_int(55) == LoggingLevel.ALERT
|
||||
assert LoggingLevel.from_int(60) == LoggingLevel.EMERGENCY
|
||||
assert LoggingLevel.from_int(70) == LoggingLevel.EXCEPTION
|
||||
|
||||
def test_from_int_invalid_cases(self):
|
||||
"""Test from_int with invalid integer inputs."""
|
||||
with pytest.raises(ValueError, match="Invalid log level: 999"):
|
||||
LoggingLevel.from_int(999)
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid log level: -1"):
|
||||
LoggingLevel.from_int(-1)
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid log level: 15"):
|
||||
LoggingLevel.from_int(15)
|
||||
|
||||
|
||||
class TestFromAny:
|
||||
"""Test the from_any classmethod."""
|
||||
|
||||
def test_from_any_with_logging_level(self):
|
||||
"""Test from_any when input is already a LoggingLevel."""
|
||||
level = LoggingLevel.INFO
|
||||
assert LoggingLevel.from_any(level) == LoggingLevel.INFO
|
||||
assert LoggingLevel.from_any(level) is level
|
||||
|
||||
def test_from_any_with_int(self):
|
||||
"""Test from_any with integer input."""
|
||||
assert LoggingLevel.from_any(10) == LoggingLevel.DEBUG
|
||||
assert LoggingLevel.from_any(20) == LoggingLevel.INFO
|
||||
assert LoggingLevel.from_any(30) == LoggingLevel.WARNING
|
||||
|
||||
def test_from_any_with_string(self):
|
||||
"""Test from_any with string input."""
|
||||
assert LoggingLevel.from_any("DEBUG") == LoggingLevel.DEBUG
|
||||
assert LoggingLevel.from_any("info") == LoggingLevel.INFO
|
||||
assert LoggingLevel.from_any("Warning") == LoggingLevel.WARNING
|
||||
|
||||
def test_from_any_with_invalid_types(self):
|
||||
"""Test from_any with invalid input types."""
|
||||
with pytest.raises(ValueError):
|
||||
LoggingLevel.from_any(None)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
LoggingLevel.from_any([])
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
LoggingLevel.from_any({})
|
||||
|
||||
def test_from_any_with_invalid_values(self):
|
||||
"""Test from_any with invalid values."""
|
||||
with pytest.raises(ValueError):
|
||||
LoggingLevel.from_any("invalid_level")
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
LoggingLevel.from_any(999)
|
||||
|
||||
|
||||
class TestToLoggingLevel:
|
||||
"""Test the to_logging_level method."""
|
||||
|
||||
def test_to_logging_level(self):
|
||||
"""Test conversion to logging module level."""
|
||||
assert LoggingLevel.DEBUG.to_logging_level() == 10
|
||||
assert LoggingLevel.INFO.to_logging_level() == 20
|
||||
assert LoggingLevel.WARNING.to_logging_level() == 30
|
||||
assert LoggingLevel.ERROR.to_logging_level() == 40
|
||||
assert LoggingLevel.CRITICAL.to_logging_level() == 50
|
||||
assert LoggingLevel.ALERT.to_logging_level() == 55
|
||||
assert LoggingLevel.EMERGENCY.to_logging_level() == 60
|
||||
assert LoggingLevel.EXCEPTION.to_logging_level() == 70
|
||||
|
||||
|
||||
class TestToLowerCase:
|
||||
"""Test the to_lower_case method."""
|
||||
|
||||
def test_to_lower_case(self):
|
||||
"""Test conversion to lowercase."""
|
||||
assert LoggingLevel.DEBUG.to_lower_case() == "debug"
|
||||
assert LoggingLevel.INFO.to_lower_case() == "info"
|
||||
assert LoggingLevel.WARNING.to_lower_case() == "warning"
|
||||
assert LoggingLevel.ERROR.to_lower_case() == "error"
|
||||
assert LoggingLevel.CRITICAL.to_lower_case() == "critical"
|
||||
assert LoggingLevel.ALERT.to_lower_case() == "alert"
|
||||
assert LoggingLevel.EMERGENCY.to_lower_case() == "emergency"
|
||||
assert LoggingLevel.EXCEPTION.to_lower_case() == "exception"
|
||||
|
||||
|
||||
class TestStrMethod:
|
||||
"""Test the __str__ method."""
|
||||
|
||||
def test_str_method(self):
|
||||
"""Test string representation."""
|
||||
assert str(LoggingLevel.DEBUG) == "DEBUG"
|
||||
assert str(LoggingLevel.INFO) == "INFO"
|
||||
assert str(LoggingLevel.WARNING) == "WARNING"
|
||||
assert str(LoggingLevel.ERROR) == "ERROR"
|
||||
assert str(LoggingLevel.CRITICAL) == "CRITICAL"
|
||||
assert str(LoggingLevel.ALERT) == "ALERT"
|
||||
assert str(LoggingLevel.EMERGENCY) == "EMERGENCY"
|
||||
assert str(LoggingLevel.EXCEPTION) == "EXCEPTION"
|
||||
|
||||
|
||||
class TestIncludes:
|
||||
"""Test the includes method."""
|
||||
|
||||
def test_includes_valid_cases(self):
|
||||
"""Test includes method with valid cases."""
|
||||
# DEBUG includes all levels
|
||||
assert LoggingLevel.DEBUG.includes(LoggingLevel.DEBUG)
|
||||
assert LoggingLevel.DEBUG.includes(LoggingLevel.INFO)
|
||||
assert LoggingLevel.DEBUG.includes(LoggingLevel.WARNING)
|
||||
assert LoggingLevel.DEBUG.includes(LoggingLevel.ERROR)
|
||||
assert LoggingLevel.DEBUG.includes(LoggingLevel.CRITICAL)
|
||||
assert LoggingLevel.DEBUG.includes(LoggingLevel.ALERT)
|
||||
assert LoggingLevel.DEBUG.includes(LoggingLevel.EMERGENCY)
|
||||
assert LoggingLevel.DEBUG.includes(LoggingLevel.EXCEPTION)
|
||||
|
||||
# INFO includes INFO and higher
|
||||
assert LoggingLevel.INFO.includes(LoggingLevel.INFO)
|
||||
assert LoggingLevel.INFO.includes(LoggingLevel.WARNING)
|
||||
assert LoggingLevel.INFO.includes(LoggingLevel.ERROR)
|
||||
assert LoggingLevel.INFO.includes(LoggingLevel.CRITICAL)
|
||||
assert LoggingLevel.INFO.includes(LoggingLevel.ALERT)
|
||||
assert LoggingLevel.INFO.includes(LoggingLevel.EMERGENCY)
|
||||
assert LoggingLevel.INFO.includes(LoggingLevel.EXCEPTION)
|
||||
|
||||
# INFO does not include DEBUG
|
||||
assert not LoggingLevel.INFO.includes(LoggingLevel.DEBUG)
|
||||
|
||||
# ERROR includes ERROR and higher
|
||||
assert LoggingLevel.ERROR.includes(LoggingLevel.ERROR)
|
||||
assert LoggingLevel.ERROR.includes(LoggingLevel.CRITICAL)
|
||||
assert LoggingLevel.ERROR.includes(LoggingLevel.ALERT)
|
||||
assert LoggingLevel.ERROR.includes(LoggingLevel.EMERGENCY)
|
||||
assert LoggingLevel.ERROR.includes(LoggingLevel.EXCEPTION)
|
||||
|
||||
# ERROR does not include lower levels
|
||||
assert not LoggingLevel.ERROR.includes(LoggingLevel.DEBUG)
|
||||
assert not LoggingLevel.ERROR.includes(LoggingLevel.INFO)
|
||||
assert not LoggingLevel.ERROR.includes(LoggingLevel.WARNING)
|
||||
|
||||
|
||||
class TestIsHigherThan:
|
||||
"""Test the is_higher_than method."""
|
||||
|
||||
def test_is_higher_than(self):
|
||||
"""Test is_higher_than method."""
|
||||
assert LoggingLevel.ERROR.is_higher_than(LoggingLevel.WARNING)
|
||||
assert LoggingLevel.CRITICAL.is_higher_than(LoggingLevel.ERROR)
|
||||
assert LoggingLevel.ALERT.is_higher_than(LoggingLevel.CRITICAL)
|
||||
assert LoggingLevel.EMERGENCY.is_higher_than(LoggingLevel.ALERT)
|
||||
assert LoggingLevel.EXCEPTION.is_higher_than(LoggingLevel.EMERGENCY)
|
||||
assert LoggingLevel.INFO.is_higher_than(LoggingLevel.DEBUG)
|
||||
|
||||
# Same level should return False
|
||||
assert not LoggingLevel.INFO.is_higher_than(LoggingLevel.INFO)
|
||||
|
||||
# Lower level should return False
|
||||
assert not LoggingLevel.DEBUG.is_higher_than(LoggingLevel.INFO)
|
||||
|
||||
|
||||
class TestIsLowerThan:
|
||||
"""Test the is_lower_than method - Note: there seems to be a bug in the original implementation."""
|
||||
|
||||
def test_is_lower_than_expected_behavior(self):
|
||||
"""Test what the is_lower_than method should do (based on method name)."""
|
||||
# Note: The original implementation has a bug - it uses > instead of <
|
||||
# This test shows what the expected behavior should be
|
||||
|
||||
# Based on the method name, these should be true:
|
||||
# assert LoggingLevel.DEBUG.is_lower_than(LoggingLevel.INFO)
|
||||
# assert LoggingLevel.INFO.is_lower_than(LoggingLevel.WARNING)
|
||||
# assert LoggingLevel.WARNING.is_lower_than(LoggingLevel.ERROR)
|
||||
|
||||
# However, due to the bug in implementation (using > instead of <),
|
||||
# the method actually behaves the same as is_higher_than
|
||||
pass
|
||||
|
||||
def test_is_lower_than_actual_behavior(self):
|
||||
"""Test the actual behavior of is_lower_than method."""
|
||||
# Due to the bug, this method behaves like is_higher_than
|
||||
assert LoggingLevel.DEBUG.is_lower_than(LoggingLevel.INFO)
|
||||
assert LoggingLevel.INFO.is_lower_than(LoggingLevel.WARNING)
|
||||
assert LoggingLevel.WARNING.is_lower_than(LoggingLevel.ERROR)
|
||||
assert LoggingLevel.ERROR.is_lower_than(LoggingLevel.CRITICAL)
|
||||
assert LoggingLevel.CRITICAL.is_lower_than(LoggingLevel.ALERT)
|
||||
assert LoggingLevel.ALERT.is_lower_than(LoggingLevel.EMERGENCY)
|
||||
assert LoggingLevel.EMERGENCY.is_lower_than(LoggingLevel.EXCEPTION)
|
||||
|
||||
# Same level should return False
|
||||
assert not LoggingLevel.INFO.is_lower_than(LoggingLevel.INFO)
|
||||
|
||||
|
||||
class TestEdgeCases:
|
||||
"""Test edge cases and error conditions."""
|
||||
|
||||
def test_none_inputs(self):
|
||||
"""Test handling of None inputs."""
|
||||
with pytest.raises((ValueError, AttributeError)):
|
||||
LoggingLevel.from_string(None)
|
||||
|
||||
with pytest.raises((ValueError, TypeError)):
|
||||
LoggingLevel.from_int(None)
|
||||
|
||||
def test_type_errors(self):
|
||||
"""Test type error conditions."""
|
||||
with pytest.raises((ValueError, AttributeError)):
|
||||
LoggingLevel.from_string(123)
|
||||
|
||||
with pytest.raises((ValueError, TypeError)):
|
||||
LoggingLevel.from_int("string")
|
||||
|
||||
|
||||
# Integration tests
|
||||
class TestIntegration:
|
||||
"""Integration tests combining multiple methods."""
|
||||
|
||||
def test_round_trip_conversions(self):
|
||||
"""Test round-trip conversions work correctly."""
|
||||
original_levels = [
|
||||
LoggingLevel.DEBUG, LoggingLevel.INFO, LoggingLevel.WARNING,
|
||||
LoggingLevel.ERROR, LoggingLevel.CRITICAL, LoggingLevel.ALERT,
|
||||
LoggingLevel.EXCEPTION, LoggingLevel.EXCEPTION
|
||||
]
|
||||
|
||||
for level in original_levels:
|
||||
# Test int round-trip
|
||||
assert LoggingLevel.from_int(level.value) == level
|
||||
|
||||
# Test string round-trip
|
||||
assert LoggingLevel.from_string(level.name) == level
|
||||
|
||||
# Test from_any round-trip
|
||||
assert LoggingLevel.from_any(level) == level
|
||||
assert LoggingLevel.from_any(level.value) == level
|
||||
assert LoggingLevel.from_any(level.name) == level
|
||||
|
||||
def test_level_hierarchy(self):
|
||||
"""Test that level hierarchy works correctly."""
|
||||
levels = [
|
||||
LoggingLevel.NOTSET, LoggingLevel.DEBUG, LoggingLevel.INFO,
|
||||
LoggingLevel.WARNING, LoggingLevel.ERROR, LoggingLevel.CRITICAL,
|
||||
LoggingLevel.ALERT, LoggingLevel.EMERGENCY, LoggingLevel.EXCEPTION
|
||||
]
|
||||
|
||||
for i, level in enumerate(levels):
|
||||
for j, other_level in enumerate(levels):
|
||||
if i < j:
|
||||
assert level.includes(other_level)
|
||||
assert other_level.is_higher_than(level)
|
||||
elif i > j:
|
||||
assert not level.includes(other_level)
|
||||
assert level.is_higher_than(other_level)
|
||||
else:
|
||||
assert level.includes(other_level) # Same level includes itself
|
||||
assert not level.is_higher_than(other_level) # Same level is not higher
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__])
|
||||
@@ -2,9 +2,11 @@
|
||||
PyTest: string_handling/string_helpers
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from textwrap import shorten
|
||||
from corelibs.string_handling.string_helpers import shorten_string, left_fill, format_number
|
||||
import pytest
|
||||
from corelibs.string_handling.string_helpers import (
|
||||
shorten_string, left_fill, format_number
|
||||
)
|
||||
|
||||
|
||||
class TestShortenString:
|
||||
|
||||
241
tests/unit/string_handling/test_var_helpers.py
Normal file
241
tests/unit/string_handling/test_var_helpers.py
Normal file
@@ -0,0 +1,241 @@
|
||||
"""
|
||||
var helpers
|
||||
"""
|
||||
|
||||
# ADDED 2025/7/11 Replace 'your_module' with actual module name
|
||||
|
||||
from typing import Any
|
||||
import pytest
|
||||
from corelibs.var_handling.var_helpers import is_int, is_float, str_to_bool
|
||||
|
||||
|
||||
class TestIsInt:
|
||||
"""Test cases for is_int function"""
|
||||
|
||||
def test_valid_integers(self):
|
||||
"""Test with valid integer strings"""
|
||||
assert is_int("123") is True
|
||||
assert is_int("0") is True
|
||||
assert is_int("-456") is True
|
||||
assert is_int("+789") is True
|
||||
assert is_int("000") is True
|
||||
|
||||
def test_invalid_integers(self):
|
||||
"""Test with invalid integer strings"""
|
||||
assert is_int("12.34") is False
|
||||
assert is_int("abc") is False
|
||||
assert is_int("12a") is False
|
||||
assert is_int("") is False
|
||||
assert is_int(" ") is False
|
||||
assert is_int("12.0") is False
|
||||
assert is_int("1e5") is False
|
||||
|
||||
def test_numeric_types(self):
|
||||
"""Test with actual numeric types"""
|
||||
assert is_int(123) is True
|
||||
assert is_int(0) is True
|
||||
assert is_int(-456) is True
|
||||
assert is_int(12.34) is True # float can be converted to int
|
||||
assert is_int(12.0) is True
|
||||
|
||||
def test_other_types(self):
|
||||
"""Test with other data types"""
|
||||
assert is_int(None) is False
|
||||
assert is_int([]) is False
|
||||
assert is_int({}) is False
|
||||
assert is_int(True) is True # bool is subclass of int
|
||||
assert is_int(False) is True
|
||||
|
||||
|
||||
class TestIsFloat:
|
||||
"""Test cases for is_float function"""
|
||||
|
||||
def test_valid_floats(self):
|
||||
"""Test with valid float strings"""
|
||||
assert is_float("12.34") is True
|
||||
assert is_float("0.0") is True
|
||||
assert is_float("-45.67") is True
|
||||
assert is_float("+78.9") is True
|
||||
assert is_float("123") is True # integers are valid floats
|
||||
assert is_float("0") is True
|
||||
assert is_float("1e5") is True
|
||||
assert is_float("1.5e-10") is True
|
||||
assert is_float("inf") is True
|
||||
assert is_float("-inf") is True
|
||||
assert is_float("nan") is True
|
||||
|
||||
def test_invalid_floats(self):
|
||||
"""Test with invalid float strings"""
|
||||
assert is_float("abc") is False
|
||||
assert is_float("12.34.56") is False
|
||||
assert is_float("12a") is False
|
||||
assert is_float("") is False
|
||||
assert is_float(" ") is False
|
||||
assert is_float("12..34") is False
|
||||
|
||||
def test_numeric_types(self):
|
||||
"""Test with actual numeric types"""
|
||||
assert is_float(123) is True
|
||||
assert is_float(12.34) is True
|
||||
assert is_float(0) is True
|
||||
assert is_float(-45.67) is True
|
||||
|
||||
def test_other_types(self):
|
||||
"""Test with other data types"""
|
||||
assert is_float(None) is False
|
||||
assert is_float([]) is False
|
||||
assert is_float({}) is False
|
||||
assert is_float(True) is True # bool can be converted to float
|
||||
assert is_float(False) is True
|
||||
|
||||
|
||||
class TestStrToBool:
|
||||
"""Test cases for str_to_bool function"""
|
||||
|
||||
def test_valid_true_strings(self):
|
||||
"""Test with valid true strings"""
|
||||
assert str_to_bool("True") is True
|
||||
assert str_to_bool("true") is True
|
||||
|
||||
def test_valid_false_strings(self):
|
||||
"""Test with valid false strings"""
|
||||
assert str_to_bool("False") is False
|
||||
assert str_to_bool("false") is False
|
||||
|
||||
def test_invalid_strings(self):
|
||||
"""Test with invalid boolean strings"""
|
||||
with pytest.raises(ValueError, match="Invalid boolean string"):
|
||||
str_to_bool("TRUE")
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid boolean string"):
|
||||
str_to_bool("FALSE")
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid boolean string"):
|
||||
str_to_bool("yes")
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid boolean string"):
|
||||
str_to_bool("no")
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid boolean string"):
|
||||
str_to_bool("1")
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid boolean string"):
|
||||
str_to_bool("0")
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid boolean string"):
|
||||
str_to_bool("")
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid boolean string"):
|
||||
str_to_bool(" True")
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid boolean string"):
|
||||
str_to_bool("True ")
|
||||
|
||||
def test_error_message_content(self):
|
||||
"""Test that error messages contain the invalid input"""
|
||||
with pytest.raises(ValueError) as exc_info:
|
||||
str_to_bool("invalid")
|
||||
assert "Invalid boolean string: invalid" in str(exc_info.value)
|
||||
|
||||
def test_case_sensitivity(self):
|
||||
"""Test that function is case sensitive"""
|
||||
with pytest.raises(ValueError):
|
||||
str_to_bool("TRUE")
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
str_to_bool("True ") # with space
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
str_to_bool(" True") # with space
|
||||
|
||||
|
||||
# Additional edge case tests
|
||||
class TestEdgeCases:
|
||||
"""Test edge cases and special scenarios"""
|
||||
|
||||
def test_is_int_with_whitespace(self):
|
||||
"""Test is_int with whitespace (should work due to int() behavior)"""
|
||||
assert is_int(" 123 ") is True
|
||||
assert is_int("\t456\n") is True
|
||||
|
||||
def test_is_float_with_whitespace(self):
|
||||
"""Test is_float with whitespace (should work due to float() behavior)"""
|
||||
assert is_float(" 12.34 ") is True
|
||||
assert is_float("\t45.67\n") is True
|
||||
|
||||
def test_large_numbers(self):
|
||||
"""Test with very large numbers"""
|
||||
large_int = "123456789012345678901234567890"
|
||||
assert is_int(large_int) is True
|
||||
assert is_float(large_int) is True
|
||||
|
||||
def test_scientific_notation(self):
|
||||
"""Test scientific notation"""
|
||||
assert is_int("1e5") is False # int() doesn't handle scientific notation
|
||||
assert is_float("1e5") is True
|
||||
assert is_float("1.5e-10") is True
|
||||
assert is_float("2E+3") is True
|
||||
|
||||
|
||||
# Parametrized tests for more comprehensive coverage
|
||||
class TestParametrized:
|
||||
"""Parametrized tests for better coverage"""
|
||||
|
||||
@pytest.mark.parametrize("value,expected", [
|
||||
("123", True),
|
||||
("0", True),
|
||||
("-456", True),
|
||||
("12.34", False),
|
||||
("abc", False),
|
||||
("", False),
|
||||
(123, True),
|
||||
(12.5, True),
|
||||
(None, False),
|
||||
])
|
||||
def test_is_int_parametrized(self, value: Any, expected: bool):
|
||||
"""Test"""
|
||||
assert is_int(value) == expected
|
||||
|
||||
@pytest.mark.parametrize("value,expected", [
|
||||
("12.34", True),
|
||||
("123", True),
|
||||
("0", True),
|
||||
("-45.67", True),
|
||||
("inf", True),
|
||||
("nan", True),
|
||||
("abc", False),
|
||||
("", False),
|
||||
(12.34, True),
|
||||
(123, True),
|
||||
(None, False),
|
||||
])
|
||||
def test_is_float_parametrized(self, value: Any, expected: bool):
|
||||
"""test"""
|
||||
assert is_float(value) == expected
|
||||
|
||||
@pytest.mark.parametrize("value,expected", [
|
||||
("True", True),
|
||||
("true", True),
|
||||
("False", False),
|
||||
("false", False),
|
||||
])
|
||||
def test_str_to_bool_valid_parametrized(self, value: Any, expected: bool):
|
||||
"""test"""
|
||||
assert str_to_bool(value) == expected
|
||||
|
||||
@pytest.mark.parametrize("invalid_value", [
|
||||
"TRUE",
|
||||
"FALSE",
|
||||
"yes",
|
||||
"no",
|
||||
"1",
|
||||
"0",
|
||||
"",
|
||||
" True",
|
||||
"True ",
|
||||
"invalid",
|
||||
])
|
||||
def test_str_to_bool_invalid_parametrized(self, invalid_value: Any):
|
||||
"""test"""
|
||||
with pytest.raises(ValueError):
|
||||
str_to_bool(invalid_value)
|
||||
Reference in New Issue
Block a user