Compare commits
25 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3cd3f87d68 | ||
|
|
582937b866 | ||
|
|
2b8240c156 | ||
|
|
abf4b7ac89 | ||
|
|
9c49f83c16 | ||
|
|
3a625ed0ee | ||
|
|
2cfbf4bb90 | ||
|
|
5767533668 | ||
|
|
24798f19ca | ||
|
|
26f8249187 | ||
|
|
dcefa564da | ||
|
|
edd35dccea | ||
|
|
ea527ea60c | ||
|
|
fd5e1db22b | ||
|
|
39e23faf7f | ||
|
|
de285b531a | ||
|
|
0a29a592f9 | ||
|
|
e045b1d3b5 | ||
|
|
280e5fa861 | ||
|
|
472d3495b5 | ||
|
|
2778ac6870 | ||
|
|
743a0a8ac9 | ||
|
|
694712ed2e | ||
|
|
ea3b4f1790 | ||
|
|
da68818d4f |
4
ToDo.md
4
ToDo.md
@@ -1,5 +1,5 @@
|
|||||||
# ToDo list
|
# ToDo list
|
||||||
|
|
||||||
- [ ] stub files .pyi
|
- [x] stub files .pyi
|
||||||
- [ ] Add tests for all, we need 100% test coverate
|
- [ ] Add tests for all, we need 100% test coverate
|
||||||
- [ ] Log: add custom format for "stack_correct" if set, this will override the normal stack block
|
- [x] Log: add custom format for "stack_correct" if set, this will override the normal stack block
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
# MARK: Project info
|
# MARK: Project info
|
||||||
[project]
|
[project]
|
||||||
name = "corelibs"
|
name = "corelibs"
|
||||||
version = "0.13.1"
|
version = "0.18.2"
|
||||||
description = "Collection of utils for Python scripts"
|
description = "Collection of utils for Python scripts"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
requires-python = ">=3.13"
|
requires-python = ">=3.13"
|
||||||
|
|||||||
@@ -5,10 +5,33 @@ List of regex compiled strings that can be used
|
|||||||
import re
|
import re
|
||||||
|
|
||||||
|
|
||||||
EMAIL_REGEX_BASIC = r"""
|
def compile_re(reg: str) -> re.Pattern[str]:
|
||||||
|
"""
|
||||||
|
compile a regex with verbose flag
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
reg {str} -- _description_
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
re.Pattern[str] -- _description_
|
||||||
|
"""
|
||||||
|
return re.compile(reg, re.VERBOSE)
|
||||||
|
|
||||||
|
|
||||||
|
# email regex
|
||||||
|
EMAIL_BASIC_REGEX: str = r"""
|
||||||
^[A-Za-z0-9!#$%&'*+\-\/=?^_`{|}~][A-Za-z0-9!#$%:\(\)&'*+\-\/=?^_`{|}~\.]{0,63}
|
^[A-Za-z0-9!#$%&'*+\-\/=?^_`{|}~][A-Za-z0-9!#$%:\(\)&'*+\-\/=?^_`{|}~\.]{0,63}
|
||||||
@(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[a-zA-Z]{2,6}$
|
@(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[a-zA-Z]{2,6}$
|
||||||
"""
|
"""
|
||||||
EMAIL_REGEX_BASIC_COMPILED = re.compile(EMAIL_REGEX_BASIC)
|
# Domain regex with localhost
|
||||||
|
DOMAIN_WITH_LOCALHOST_REGEX: str = r"""
|
||||||
|
^(?:localhost|(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[A-Za-z]{2,})$
|
||||||
|
"""
|
||||||
|
# domain regex with loclhost and optional port
|
||||||
|
DOMAIN_WITH_LOCALHOST_PORT_REGEX: str = r"""
|
||||||
|
^(?:localhost|(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[A-Za-z]{2,})(?::\d+)?$
|
||||||
|
"""
|
||||||
|
# Domain, no localhost
|
||||||
|
DOMAIN_REGEX: str = r"^(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[A-Za-z]{2,}$"
|
||||||
|
|
||||||
# __END__
|
# __END__
|
||||||
|
|||||||
@@ -3,7 +3,9 @@ Class of checks that can be run on value entries
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
from typing import TypedDict
|
from typing import TypedDict
|
||||||
from corelibs.check_handling.regex_constants import EMAIL_REGEX_BASIC
|
from corelibs.check_handling.regex_constants import (
|
||||||
|
EMAIL_BASIC_REGEX, DOMAIN_WITH_LOCALHOST_REGEX, DOMAIN_WITH_LOCALHOST_PORT_REGEX, DOMAIN_REGEX
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class SettingsLoaderCheckValue(TypedDict):
|
class SettingsLoaderCheckValue(TypedDict):
|
||||||
@@ -45,27 +47,25 @@ class SettingsLoaderCheck:
|
|||||||
},
|
},
|
||||||
# This does a baisc email check, only alphanumeric with special characters
|
# This does a baisc email check, only alphanumeric with special characters
|
||||||
"string.email.basic": {
|
"string.email.basic": {
|
||||||
"regex": EMAIL_REGEX_BASIC,
|
"regex": EMAIL_BASIC_REGEX,
|
||||||
"regex_clean": None,
|
"regex_clean": None,
|
||||||
"replace": "",
|
"replace": "",
|
||||||
},
|
},
|
||||||
# Domain check, including localhost no port
|
# Domain check, including localhost no port
|
||||||
"string.domain.with-localhost": {
|
"string.domain.with-localhost": {
|
||||||
"regex": r"^(?:localhost|(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[A-Za-z]{2,})$",
|
"regex": DOMAIN_WITH_LOCALHOST_REGEX,
|
||||||
"regex_clean": None,
|
"regex_clean": None,
|
||||||
"replace": "",
|
"replace": "",
|
||||||
},
|
},
|
||||||
# Domain check, with localhost and port
|
# Domain check, with localhost and port
|
||||||
"string.domain.with-localhost.port": {
|
"string.domain.with-localhost.port": {
|
||||||
"regex": r"""
|
"regex": DOMAIN_WITH_LOCALHOST_PORT_REGEX,
|
||||||
^(?:localhost|(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[A-Za-z]{2,})(?::\d+)?$
|
|
||||||
""",
|
|
||||||
"regex_clean": None,
|
"regex_clean": None,
|
||||||
"replace": "",
|
"replace": "",
|
||||||
},
|
},
|
||||||
# Domain check, no pure localhost allowed
|
# Domain check, no pure localhost allowed
|
||||||
"string.domain": {
|
"string.domain": {
|
||||||
"regex": r"^(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[A-Za-z]{2,}$",
|
"regex": DOMAIN_REGEX,
|
||||||
"regex_clean": None,
|
"regex_clean": None,
|
||||||
"replace": "",
|
"replace": "",
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -6,28 +6,39 @@ import traceback
|
|||||||
import os
|
import os
|
||||||
|
|
||||||
|
|
||||||
def traceback_call_str(start: int = 2, depth: int = 1):
|
def call_stack(
|
||||||
|
start: int = 0,
|
||||||
|
skip_last: int = -1,
|
||||||
|
separator: str = ' -> ',
|
||||||
|
reset_start_if_empty: bool = False
|
||||||
|
) -> str:
|
||||||
"""
|
"""
|
||||||
get the trace for the last entry
|
get the trace for the last entry
|
||||||
|
|
||||||
Keyword Arguments:
|
Keyword Arguments:
|
||||||
start {int} -- _description_ (default: {2})
|
start {int} -- start, if too might output will empty until reset_start_if_empty is set (default: {0})
|
||||||
depth {int} -- _description_ (default: {1})
|
skip_last {int} -- how many of the last are skipped, defaults to -1 for current method (default: {-1})
|
||||||
|
seperator {str} -- add stack separator, if empty defaults to ' -> ' (default: { -> })
|
||||||
|
reset_start_if_empty {bool} -- if no stack returned because of too high start,
|
||||||
|
reset to 0 for full read (default: {False})
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
_type_ -- _description_
|
str -- _description_
|
||||||
"""
|
"""
|
||||||
# can't have more than in the stack for depth
|
# stack = traceback.extract_stack()[start:depth]
|
||||||
depth = min(depth, start)
|
# how many of the last entries we skip (so we do not get self), default is -1
|
||||||
depth = start - depth
|
# start cannot be negative
|
||||||
# 0 is full stack length from start
|
if skip_last > 0:
|
||||||
if depth == 0:
|
skip_last = skip_last * -1
|
||||||
stack = traceback.extract_stack()[-start:]
|
stack = traceback.extract_stack()
|
||||||
else:
|
__stack = stack[start:skip_last]
|
||||||
stack = traceback.extract_stack()[-start:-depth]
|
# start possible to high, reset start to 0
|
||||||
return ' -> '.join(
|
if not __stack and reset_start_if_empty:
|
||||||
f"{os.path.basename(f.filename)}:{f.name}:{f.lineno}"
|
start = 0
|
||||||
for f in stack
|
__stack = stack[start:skip_last]
|
||||||
)
|
if not separator:
|
||||||
|
separator = ' -> '
|
||||||
|
# print(f"* HERE: {dump_data(stack)}")
|
||||||
|
return f"{separator}".join(f"{os.path.basename(f.filename)}:{f.name}:{f.lineno}" for f in __stack)
|
||||||
|
|
||||||
# __END__
|
# __END__
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import json
|
|||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
def dump_data(data: dict[Any, Any] | list[Any] | str | None) -> str:
|
def dump_data(data: Any) -> str:
|
||||||
"""
|
"""
|
||||||
dump formated output from dict/list
|
dump formated output from dict/list
|
||||||
|
|
||||||
|
|||||||
23
src/corelibs/exceptions/csv_exceptions.py
Normal file
23
src/corelibs/exceptions/csv_exceptions.py
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
"""
|
||||||
|
Exceptions for csv file reading and processing
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class NoCsvReader(Exception):
|
||||||
|
"""
|
||||||
|
CSV reader is none
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class CsvHeaderDataMissing(Exception):
|
||||||
|
"""
|
||||||
|
The csv reader returned None as headers, the header column in the csv file is missing
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class CompulsoryCsvHeaderCheckFailed(Exception):
|
||||||
|
"""
|
||||||
|
raise if the header is not matching to the excpeted values
|
||||||
|
"""
|
||||||
|
|
||||||
|
# __END__
|
||||||
@@ -2,23 +2,40 @@
|
|||||||
wrapper around search path
|
wrapper around search path
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from typing import Any
|
from typing import Any, TypedDict, NotRequired
|
||||||
|
from warnings import deprecated
|
||||||
|
|
||||||
|
|
||||||
|
class ArraySearchList(TypedDict):
|
||||||
|
"""find in array from list search dict"""
|
||||||
|
key: str
|
||||||
|
value: str | bool | int | float | list[str | None]
|
||||||
|
case_sensitive: NotRequired[bool]
|
||||||
|
|
||||||
|
|
||||||
|
@deprecated("Use find_in_array_from_list()")
|
||||||
def array_search(
|
def array_search(
|
||||||
search_params: list[dict[str, str | bool | list[str | None]]],
|
search_params: list[ArraySearchList],
|
||||||
data: list[dict[str, Any]],
|
data: list[dict[str, Any]],
|
||||||
return_index: bool = False
|
return_index: bool = False
|
||||||
|
) -> list[dict[str, Any]]:
|
||||||
|
"""depreacted, old call order"""
|
||||||
|
return find_in_array_from_list(data, search_params, return_index)
|
||||||
|
|
||||||
|
def find_in_array_from_list(
|
||||||
|
data: list[dict[str, Any]],
|
||||||
|
search_params: list[ArraySearchList],
|
||||||
|
return_index: bool = False
|
||||||
) -> list[dict[str, Any]]:
|
) -> list[dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
search in an array of dicts with an array of Key/Value set
|
search in an list of dicts with an list of Key/Value set
|
||||||
all Key/Value sets must match
|
all Key/Value sets must match
|
||||||
Value set can be list for OR match
|
Value set can be list for OR match
|
||||||
option: case_senstive: default True
|
option: case_senstive: default True
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
search_params (list): List of search params in "Key"/"Value" lists with options
|
|
||||||
data (list): data to search in, must be a list
|
data (list): data to search in, must be a list
|
||||||
|
search_params (list): List of search params in "key"/"value" lists with options
|
||||||
return_index (bool): return index of list [default False]
|
return_index (bool): return index of list [default False]
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
@@ -32,18 +49,20 @@ def array_search(
|
|||||||
"""
|
"""
|
||||||
if not isinstance(search_params, list): # type: ignore
|
if not isinstance(search_params, list): # type: ignore
|
||||||
raise ValueError("search_params must be a list")
|
raise ValueError("search_params must be a list")
|
||||||
keys = []
|
keys: list[str] = []
|
||||||
|
# check that key and value exist and are set
|
||||||
for search in search_params:
|
for search in search_params:
|
||||||
if not search.get('Key') or not search.get('Value'):
|
if not search.get('key') or not search.get('value'):
|
||||||
raise KeyError(
|
raise KeyError(
|
||||||
f"Either Key '{search.get('Key', '')}' or "
|
f"Either Key '{search.get('key', '')}' or "
|
||||||
f"Value '{search.get('Value', '')}' is missing or empty"
|
f"Value '{search.get('value', '')}' is missing or empty"
|
||||||
)
|
)
|
||||||
# if double key -> abort
|
# if double key -> abort
|
||||||
if search.get("Key") in keys:
|
if search.get("key") in keys:
|
||||||
raise KeyError(
|
raise KeyError(
|
||||||
f"Key {search.get('Key', '')} already exists in search_params"
|
f"Key {search.get('key', '')} already exists in search_params"
|
||||||
)
|
)
|
||||||
|
keys.append(str(search['key']))
|
||||||
|
|
||||||
return_items: list[dict[str, Any]] = []
|
return_items: list[dict[str, Any]] = []
|
||||||
for si_idx, search_item in enumerate(data):
|
for si_idx, search_item in enumerate(data):
|
||||||
@@ -55,20 +74,20 @@ def array_search(
|
|||||||
# lower case left side
|
# lower case left side
|
||||||
# TODO: allow nested Keys. eg "Key: ["Key a", "key b"]" to be ["Key a"]["key b"]
|
# TODO: allow nested Keys. eg "Key: ["Key a", "key b"]" to be ["Key a"]["key b"]
|
||||||
if search.get("case_sensitive", True) is False:
|
if search.get("case_sensitive", True) is False:
|
||||||
search_value = search_item.get(str(search['Key']), "").lower()
|
search_value = search_item.get(str(search['key']), "").lower()
|
||||||
else:
|
else:
|
||||||
search_value = search_item.get(str(search['Key']), "")
|
search_value = search_item.get(str(search['key']), "")
|
||||||
# lower case right side
|
# lower case right side
|
||||||
if isinstance(search['Value'], list):
|
if isinstance(search['value'], list):
|
||||||
search_in = [
|
search_in = [
|
||||||
str(k).lower()
|
str(k).lower()
|
||||||
if search.get("case_sensitive", True) is False else k
|
if search.get("case_sensitive", True) is False else k
|
||||||
for k in search['Value']
|
for k in search['value']
|
||||||
]
|
]
|
||||||
elif search.get("case_sensitive", True) is False:
|
elif search.get("case_sensitive", True) is False:
|
||||||
search_in = str(search['Value']).lower()
|
search_in = str(search['value']).lower()
|
||||||
else:
|
else:
|
||||||
search_in = search['Value']
|
search_in = search['value']
|
||||||
# compare check
|
# compare check
|
||||||
if (
|
if (
|
||||||
(
|
(
|
||||||
|
|||||||
@@ -3,26 +3,36 @@ Dict helpers
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
from typing import Any
|
from typing import TypeAlias, Union, Dict, List, Any, cast
|
||||||
|
|
||||||
|
# definitions for the mask run below
|
||||||
|
MaskableValue: TypeAlias = Union[str, int, float, bool, None]
|
||||||
|
NestedDict: TypeAlias = Dict[str, Union[MaskableValue, List[Any], 'NestedDict']]
|
||||||
|
ProcessableValue: TypeAlias = Union[MaskableValue, List[Any], NestedDict]
|
||||||
|
|
||||||
|
|
||||||
def mask(
|
def mask(
|
||||||
data_set: dict[str, str],
|
data_set: dict[str, Any],
|
||||||
mask_keys: list[str] | None = None,
|
mask_keys: list[str] | None = None,
|
||||||
mask_str: str = "***",
|
mask_str: str = "***",
|
||||||
|
mask_str_edges: str = '_',
|
||||||
skip: bool = False
|
skip: bool = False
|
||||||
) -> dict[str, str]:
|
) -> dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
mask data for output
|
mask data for output
|
||||||
Checks if mask_keys list exist in any key in the data set either from the start or at the end
|
Checks if mask_keys list exist in any key in the data set either from the start or at the end
|
||||||
|
|
||||||
|
Use the mask_str_edges to define how searches inside a string should work. Default it must start
|
||||||
|
and end with '_', remove to search string in string
|
||||||
|
|
||||||
Arguments:
|
Arguments:
|
||||||
data_set {dict[str, str]} -- _description_
|
data_set {dict[str, str]} -- _description_
|
||||||
|
|
||||||
Keyword Arguments:
|
Keyword Arguments:
|
||||||
mask_keys {list[str] | None} -- _description_ (default: {None})
|
mask_keys {list[str] | None} -- _description_ (default: {None})
|
||||||
mask_str {str} -- _description_ (default: {"***"})
|
mask_str {str} -- _description_ (default: {"***"})
|
||||||
skip {bool} -- _description_ (default: {False})
|
mask_str_edges {str} -- _description_ (default: {"_"})
|
||||||
|
skip {bool} -- if set to true skip (default: {False})
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
dict[str, str] -- _description_
|
dict[str, str] -- _description_
|
||||||
@@ -30,29 +40,46 @@ def mask(
|
|||||||
if skip is True:
|
if skip is True:
|
||||||
return data_set
|
return data_set
|
||||||
if mask_keys is None:
|
if mask_keys is None:
|
||||||
mask_keys = ["password", "secret"]
|
mask_keys = ["encryption", "password", "secret"]
|
||||||
|
else:
|
||||||
|
# make sure it is lower case
|
||||||
|
mask_keys = [mask_key.lower() for mask_key in mask_keys]
|
||||||
|
|
||||||
|
def should_mask_key(key: str) -> bool:
|
||||||
|
"""Check if a key should be masked"""
|
||||||
|
__key_lower = key.lower()
|
||||||
|
return any(
|
||||||
|
__key_lower.startswith(mask_key) or
|
||||||
|
__key_lower.endswith(mask_key) or
|
||||||
|
f"{mask_str_edges}{mask_key}{mask_str_edges}" in __key_lower
|
||||||
|
for mask_key in mask_keys
|
||||||
|
)
|
||||||
|
|
||||||
|
def mask_recursive(obj: ProcessableValue) -> ProcessableValue:
|
||||||
|
"""Recursively mask values in nested structures"""
|
||||||
|
if isinstance(obj, dict):
|
||||||
|
return {
|
||||||
|
key: mask_value(value) if should_mask_key(key) else mask_recursive(value)
|
||||||
|
for key, value in obj.items()
|
||||||
|
}
|
||||||
|
if isinstance(obj, list):
|
||||||
|
return [mask_recursive(item) for item in obj]
|
||||||
|
return obj
|
||||||
|
|
||||||
|
def mask_value(value: Any) -> Any:
|
||||||
|
"""Handle masking based on value type"""
|
||||||
|
if isinstance(value, list):
|
||||||
|
# Mask each individual value in the list
|
||||||
|
return [mask_str for _ in cast('list[Any]', value)]
|
||||||
|
if isinstance(value, dict):
|
||||||
|
# Recursively process the dictionary instead of masking the whole thing
|
||||||
|
return mask_recursive(cast('ProcessableValue', value))
|
||||||
|
# Mask primitive values
|
||||||
|
return mask_str
|
||||||
|
|
||||||
return {
|
return {
|
||||||
key: mask_str
|
key: mask_value(value) if should_mask_key(key) else mask_recursive(value)
|
||||||
if any(key.startswith(mask_key) or key.endswith(mask_key) for mask_key in mask_keys) else value
|
|
||||||
for key, value in data_set.items()
|
for key, value in data_set.items()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def set_entry(dict_set: dict[str, Any], key: str, value_set: Any) -> dict[str, Any]:
|
|
||||||
"""
|
|
||||||
set a new entry in the dict set
|
|
||||||
|
|
||||||
Arguments:
|
|
||||||
key {str} -- _description_
|
|
||||||
dict_set {dict[str, Any]} -- _description_
|
|
||||||
value_set {Any} -- _description_
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
dict[str, Any] -- _description_
|
|
||||||
"""
|
|
||||||
if not dict_set.get(key):
|
|
||||||
dict_set[key] = {}
|
|
||||||
dict_set[key] = value_set
|
|
||||||
return dict_set
|
|
||||||
|
|
||||||
# __END__
|
# __END__
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ from pathlib import Path
|
|||||||
from typing import MutableMapping, TextIO, TypedDict, Any, TYPE_CHECKING, cast
|
from typing import MutableMapping, TextIO, TypedDict, Any, TYPE_CHECKING, cast
|
||||||
from corelibs.logging_handling.logging_level_handling.logging_level import LoggingLevel
|
from corelibs.logging_handling.logging_level_handling.logging_level import LoggingLevel
|
||||||
from corelibs.string_handling.text_colors import Colors
|
from corelibs.string_handling.text_colors import Colors
|
||||||
from corelibs.debug_handling.debug_helpers import traceback_call_str
|
from corelibs.debug_handling.debug_helpers import call_stack
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from multiprocessing import Queue
|
from multiprocessing import Queue
|
||||||
@@ -20,12 +20,7 @@ if TYPE_CHECKING:
|
|||||||
|
|
||||||
# MARK: Log settings TypedDict
|
# MARK: Log settings TypedDict
|
||||||
class LogSettings(TypedDict):
|
class LogSettings(TypedDict):
|
||||||
"""
|
"""log settings, for Log setup"""
|
||||||
log settings
|
|
||||||
|
|
||||||
Arguments:
|
|
||||||
TypedDict {_type_} -- _description_
|
|
||||||
"""
|
|
||||||
log_level_console: LoggingLevel
|
log_level_console: LoggingLevel
|
||||||
log_level_file: LoggingLevel
|
log_level_file: LoggingLevel
|
||||||
console_enabled: bool
|
console_enabled: bool
|
||||||
@@ -35,6 +30,12 @@ class LogSettings(TypedDict):
|
|||||||
log_queue: 'Queue[str] | None'
|
log_queue: 'Queue[str] | None'
|
||||||
|
|
||||||
|
|
||||||
|
class LoggerInit(TypedDict):
|
||||||
|
"""for Logger init"""
|
||||||
|
logger: logging.Logger
|
||||||
|
log_queue: 'Queue[str] | None'
|
||||||
|
|
||||||
|
|
||||||
# MARK: Custom color filter
|
# MARK: Custom color filter
|
||||||
class CustomConsoleFormatter(logging.Formatter):
|
class CustomConsoleFormatter(logging.Formatter):
|
||||||
"""
|
"""
|
||||||
@@ -73,13 +74,307 @@ class CustomConsoleFormatter(logging.Formatter):
|
|||||||
message = super().format(record)
|
message = super().format(record)
|
||||||
return f"{color}{message}{reset}"
|
return f"{color}{message}{reset}"
|
||||||
|
|
||||||
|
|
||||||
# TODO: add custom handlers for stack_trace, if not set fill with %(filename)s:%(funcName)s:%(lineno)d
|
# TODO: add custom handlers for stack_trace, if not set fill with %(filename)s:%(funcName)s:%(lineno)d
|
||||||
# hasattr(record, 'stack_trace')
|
# hasattr(record, 'stack_trace')
|
||||||
|
# also for something like "context" where we add an array of anything to a message
|
||||||
|
|
||||||
|
|
||||||
# MARK: Log class
|
class CustomHandlerFilter(logging.Filter):
|
||||||
class Log:
|
"""
|
||||||
|
Add a custom handler for filtering
|
||||||
|
"""
|
||||||
|
HANDLER_NAME_FILTER_EXCEPTION: str = 'console'
|
||||||
|
|
||||||
|
def __init__(self, handler_name: str, filter_exceptions: bool = False):
|
||||||
|
super().__init__(name=handler_name)
|
||||||
|
self.handler_name = handler_name
|
||||||
|
self.filter_exceptions = filter_exceptions
|
||||||
|
|
||||||
|
def filter(self, record: logging.LogRecord) -> bool:
|
||||||
|
# if console and exception do not show
|
||||||
|
if self.handler_name == self.HANDLER_NAME_FILTER_EXCEPTION and self.filter_exceptions:
|
||||||
|
return record.levelname != "EXCEPTION"
|
||||||
|
# if cnosole entry is true and traget file filter
|
||||||
|
if hasattr(record, 'console') and getattr(record, 'console') is True and self.handler_name == 'file':
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
# def __filter_exceptions(self, record: logging.LogRecord) -> bool:
|
||||||
|
# return record.levelname != "EXCEPTION"
|
||||||
|
|
||||||
|
|
||||||
|
# MARK: Parent class
|
||||||
|
class LogParent:
|
||||||
|
"""
|
||||||
|
Parent class with general methods
|
||||||
|
used by Log and Logger
|
||||||
|
"""
|
||||||
|
|
||||||
|
# spacer lenght characters and the character
|
||||||
|
SPACER_CHAR: str = '='
|
||||||
|
SPACER_LENGTH: int = 32
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.logger: logging.Logger
|
||||||
|
self.log_queue: 'Queue[str] | None' = None
|
||||||
|
self.handlers: dict[str, Any] = {}
|
||||||
|
|
||||||
|
# FIXME: we need to add a custom formater to add stack level listing if we want to
|
||||||
|
# Important note, although they exist, it is recommended to use self.logger.NAME directly
|
||||||
|
# so that the correct filename, method and row number is set
|
||||||
|
# for > 50 use logger.log(LoggingLevel.<LEVEL>.value, ...)
|
||||||
|
# for exception logger.log(LoggingLevel.EXCEPTION.value, ..., execInfo=True)
|
||||||
|
# MARK: log message
|
||||||
|
def log(self, level: int, msg: object, *args: object, extra: MutableMapping[str, object] | None = None):
|
||||||
|
"""log general"""
|
||||||
|
if not hasattr(self, 'logger'):
|
||||||
|
raise ValueError('Logger is not yet initialized')
|
||||||
|
if extra is None:
|
||||||
|
extra = {}
|
||||||
|
extra['stack_trace'] = call_stack(skip_last=2)
|
||||||
|
self.logger.log(level, msg, *args, extra=extra, stacklevel=2)
|
||||||
|
|
||||||
|
# MARK: DEBUG 10
|
||||||
|
def debug(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
||||||
|
"""debug"""
|
||||||
|
if not hasattr(self, 'logger'):
|
||||||
|
raise ValueError('Logger is not yet initialized')
|
||||||
|
if extra is None:
|
||||||
|
extra = {}
|
||||||
|
extra['stack_trace'] = call_stack(skip_last=2)
|
||||||
|
self.logger.debug(msg, *args, extra=extra, stacklevel=2)
|
||||||
|
|
||||||
|
# MARK: INFO 20
|
||||||
|
def info(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
||||||
|
"""info"""
|
||||||
|
if not hasattr(self, 'logger'):
|
||||||
|
raise ValueError('Logger is not yet initialized')
|
||||||
|
if extra is None:
|
||||||
|
extra = {}
|
||||||
|
extra['stack_trace'] = call_stack(skip_last=2)
|
||||||
|
self.logger.info(msg, *args, extra=extra, stacklevel=2)
|
||||||
|
|
||||||
|
# MARK: WARNING 30
|
||||||
|
def warning(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
||||||
|
"""warning"""
|
||||||
|
if not hasattr(self, 'logger'):
|
||||||
|
raise ValueError('Logger is not yet initialized')
|
||||||
|
if extra is None:
|
||||||
|
extra = {}
|
||||||
|
extra['stack_trace'] = call_stack(skip_last=2)
|
||||||
|
self.logger.warning(msg, *args, extra=extra, stacklevel=2)
|
||||||
|
|
||||||
|
# MARK: ERROR 40
|
||||||
|
def error(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
||||||
|
"""error"""
|
||||||
|
if not hasattr(self, 'logger'):
|
||||||
|
raise ValueError('Logger is not yet initialized')
|
||||||
|
if extra is None:
|
||||||
|
extra = {}
|
||||||
|
extra['stack_trace'] = call_stack(skip_last=2)
|
||||||
|
self.logger.error(msg, *args, extra=extra, stacklevel=2)
|
||||||
|
|
||||||
|
# MARK: CRITICAL 50
|
||||||
|
def critical(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
||||||
|
"""critcal"""
|
||||||
|
if not hasattr(self, 'logger'):
|
||||||
|
raise ValueError('Logger is not yet initialized')
|
||||||
|
if extra is None:
|
||||||
|
extra = {}
|
||||||
|
extra['stack_trace'] = call_stack(skip_last=2)
|
||||||
|
self.logger.critical(msg, *args, extra=extra, stacklevel=2)
|
||||||
|
|
||||||
|
# MARK: ALERT 55
|
||||||
|
def alert(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
||||||
|
"""alert"""
|
||||||
|
if not hasattr(self, 'logger'):
|
||||||
|
raise ValueError('Logger is not yet initialized')
|
||||||
|
# extra_dict = dict(extra)
|
||||||
|
if extra is None:
|
||||||
|
extra = {}
|
||||||
|
extra['stack_trace'] = call_stack(skip_last=2)
|
||||||
|
self.logger.log(LoggingLevel.ALERT.value, msg, *args, extra=extra, stacklevel=2)
|
||||||
|
|
||||||
|
# MARK: EMERGECNY: 60
|
||||||
|
def emergency(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
||||||
|
"""emergency"""
|
||||||
|
if not hasattr(self, 'logger'):
|
||||||
|
raise ValueError('Logger is not yet initialized')
|
||||||
|
if extra is None:
|
||||||
|
extra = {}
|
||||||
|
extra['stack_trace'] = call_stack(skip_last=2)
|
||||||
|
self.logger.log(LoggingLevel.EMERGENCY.value, msg, *args, extra=extra, stacklevel=2)
|
||||||
|
|
||||||
|
# MARK: EXCEPTION: 70
|
||||||
|
def exception(
|
||||||
|
self,
|
||||||
|
msg: object, *args: object, extra: MutableMapping[str, object] | None = None,
|
||||||
|
log_error: bool = True
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
log on exceotion level, this is log.exception, but logs with a new level
|
||||||
|
|
||||||
|
Args:
|
||||||
|
msg (object): _description_
|
||||||
|
*args (object): arguments for msg
|
||||||
|
extra: Mapping[str, object] | None: extra arguments for the formatting if needed
|
||||||
|
log_error: (bool): If set to false will not write additional error message for console (Default True)
|
||||||
|
"""
|
||||||
|
if not hasattr(self, 'logger'):
|
||||||
|
raise ValueError('Logger is not yet initialized')
|
||||||
|
if extra is None:
|
||||||
|
extra = {}
|
||||||
|
extra['stack_trace'] = call_stack(skip_last=2)
|
||||||
|
# write to console first with extra flag for filtering in file
|
||||||
|
if log_error:
|
||||||
|
self.logger.log(
|
||||||
|
LoggingLevel.ERROR.value,
|
||||||
|
f"<=EXCEPTION> {msg}", *args, extra=dict(extra) | {'console': True}, stacklevel=2
|
||||||
|
)
|
||||||
|
self.logger.log(LoggingLevel.EXCEPTION.value, msg, *args, exc_info=True, extra=extra, stacklevel=2)
|
||||||
|
|
||||||
|
def break_line(self, info: str = "BREAK"):
|
||||||
|
"""
|
||||||
|
add a break line as info level
|
||||||
|
|
||||||
|
Keyword Arguments:
|
||||||
|
info {str} -- _description_ (default: {"BREAK"})
|
||||||
|
"""
|
||||||
|
if not hasattr(self, 'logger'):
|
||||||
|
raise ValueError('Logger is not yet initialized')
|
||||||
|
self.logger.info("[%s] %s>", info, self.SPACER_CHAR * self.SPACER_LENGTH)
|
||||||
|
|
||||||
|
# MARK: queue handling
|
||||||
|
def flush(self, handler_name: str | None = None, timeout: float = 2.0) -> bool:
|
||||||
|
"""
|
||||||
|
Flush all pending messages
|
||||||
|
|
||||||
|
Keyword Arguments:
|
||||||
|
handler_name {str | None} -- _description_ (default: {None})
|
||||||
|
timeout {float} -- _description_ (default: {2.0})
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool -- _description_
|
||||||
|
"""
|
||||||
|
if not self.log_queue:
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Wait for queue to be processed
|
||||||
|
start_time = time.time()
|
||||||
|
while not self.log_queue.empty() and (time.time() - start_time) < timeout:
|
||||||
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
# Flush all handlers or handler given
|
||||||
|
if handler_name:
|
||||||
|
try:
|
||||||
|
self.handlers[handler_name].flush()
|
||||||
|
except IndexError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
for handler in self.handlers.values():
|
||||||
|
handler.flush()
|
||||||
|
except OSError:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
# MARK: log level handling
|
||||||
|
def set_log_level(self, handler_name: str, log_level: LoggingLevel) -> bool:
|
||||||
|
"""
|
||||||
|
set the logging level for a handler
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
handler {str} -- _description_
|
||||||
|
log_level {LoggingLevel} -- _description_
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool -- _description_
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# flush queue befoe changing logging level
|
||||||
|
self.flush(handler_name)
|
||||||
|
self.handlers[handler_name].setLevel(log_level.name)
|
||||||
|
return True
|
||||||
|
except IndexError:
|
||||||
|
if self.logger:
|
||||||
|
self.logger.error('Handler %s not found, cannot change log level', handler_name)
|
||||||
|
return False
|
||||||
|
except AttributeError:
|
||||||
|
if self.logger:
|
||||||
|
self.logger.error(
|
||||||
|
'Cannot change to log level %s for handler %s, log level invalid',
|
||||||
|
LoggingLevel.name, handler_name
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get_log_level(self, handler_name: str) -> LoggingLevel:
|
||||||
|
"""
|
||||||
|
gettthe logging level for a handler
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
handler_name {str} -- _description_
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
LoggingLevel -- _description_
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
return LoggingLevel.from_any(self.handlers[handler_name].level)
|
||||||
|
except IndexError:
|
||||||
|
return LoggingLevel.NOTSET
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def validate_log_level(log_level: Any) -> bool:
|
||||||
|
"""
|
||||||
|
if the log level is invalid will return false, else return true
|
||||||
|
|
||||||
|
Args:
|
||||||
|
log_level (Any): _description_
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: _description_
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
_ = LoggingLevel.from_any(log_level).value
|
||||||
|
return True
|
||||||
|
except ValueError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_log_level_int(log_level: Any) -> int:
|
||||||
|
"""
|
||||||
|
Return log level as INT
|
||||||
|
If invalid returns the default log level
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
log_level {Any} -- _description_
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int -- _description_
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
return LoggingLevel.from_any(log_level).value
|
||||||
|
except ValueError:
|
||||||
|
return LoggingLevel.from_string(Log.DEFAULT_LOG_LEVEL.name).value
|
||||||
|
|
||||||
|
|
||||||
|
# MARK: Logger
|
||||||
|
class Logger(LogParent):
|
||||||
|
"""
|
||||||
|
The class we can pass on to other clases without re-init the class itself
|
||||||
|
NOTE: if no queue object is handled over the logging level change might not take immediate effect
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, logger_settings: LoggerInit):
|
||||||
|
LogParent.__init__(self)
|
||||||
|
self.logger = logger_settings['logger']
|
||||||
|
self.lg = self.logger
|
||||||
|
self.l = self.logger
|
||||||
|
self.handlers = {str(_handler.name): _handler for _handler in self.logger.handlers}
|
||||||
|
self.log_queue = logger_settings['log_queue']
|
||||||
|
|
||||||
|
|
||||||
|
# MARK: LogSetup class
|
||||||
|
class Log(LogParent):
|
||||||
"""
|
"""
|
||||||
logger setup
|
logger setup
|
||||||
"""
|
"""
|
||||||
@@ -93,8 +388,8 @@ class Log:
|
|||||||
DEFAULT_LOG_LEVEL_CONSOLE: LoggingLevel = LoggingLevel.WARNING
|
DEFAULT_LOG_LEVEL_CONSOLE: LoggingLevel = LoggingLevel.WARNING
|
||||||
# default settings
|
# default settings
|
||||||
DEFAULT_LOG_SETTINGS: LogSettings = {
|
DEFAULT_LOG_SETTINGS: LogSettings = {
|
||||||
"log_level_console": LoggingLevel.WARNING,
|
"log_level_console": DEFAULT_LOG_LEVEL_CONSOLE,
|
||||||
"log_level_file": LoggingLevel.DEBUG,
|
"log_level_file": DEFAULT_LOG_LEVEL_FILE,
|
||||||
"console_enabled": True,
|
"console_enabled": True,
|
||||||
"console_color_output_enabled": True,
|
"console_color_output_enabled": True,
|
||||||
"add_start_info": True,
|
"add_start_info": True,
|
||||||
@@ -110,6 +405,7 @@ class Log:
|
|||||||
log_settings: dict[str, 'LoggingLevel | str | bool | None | Queue[str]'] | LogSettings | None = None,
|
log_settings: dict[str, 'LoggingLevel | str | bool | None | Queue[str]'] | LogSettings | None = None,
|
||||||
other_handlers: dict[str, Any] | None = None
|
other_handlers: dict[str, Any] | None = None
|
||||||
):
|
):
|
||||||
|
LogParent.__init__(self)
|
||||||
# add new level for alert, emergecny and exception
|
# add new level for alert, emergecny and exception
|
||||||
logging.addLevelName(LoggingLevel.ALERT.value, LoggingLevel.ALERT.name)
|
logging.addLevelName(LoggingLevel.ALERT.value, LoggingLevel.ALERT.name)
|
||||||
logging.addLevelName(LoggingLevel.EMERGENCY.value, LoggingLevel.EMERGENCY.name)
|
logging.addLevelName(LoggingLevel.EMERGENCY.value, LoggingLevel.EMERGENCY.name)
|
||||||
@@ -142,13 +438,13 @@ class Log:
|
|||||||
# in the file writer too, for the ones where color is set BEFORE the format
|
# in the file writer too, for the ones where color is set BEFORE the format
|
||||||
# Any is logging.StreamHandler, logging.FileHandler and all logging.handlers.*
|
# Any is logging.StreamHandler, logging.FileHandler and all logging.handlers.*
|
||||||
self.handlers: dict[str, Any] = {}
|
self.handlers: dict[str, Any] = {}
|
||||||
self.add_handler('file_handler', self.__create_time_rotating_file_handler(
|
self.add_handler('file_handler', self.__create_timed_rotating_file_handler(
|
||||||
self.log_settings['log_level_file'], log_path)
|
'file_handler', self.log_settings['log_level_file'], log_path)
|
||||||
)
|
)
|
||||||
if self.log_settings['console_enabled']:
|
if self.log_settings['console_enabled']:
|
||||||
# console
|
# console
|
||||||
self.add_handler('stream_handler', self.__create_console_handler(
|
self.add_handler('stream_handler', self.__create_console_handler(
|
||||||
self.log_settings['log_level_console'])
|
'stream_handler', self.log_settings['log_level_console'])
|
||||||
)
|
)
|
||||||
# add other handlers,
|
# add other handlers,
|
||||||
if other_handlers is not None:
|
if other_handlers is not None:
|
||||||
@@ -211,8 +507,8 @@ class Log:
|
|||||||
default_log_settings['log_queue'] = __setting
|
default_log_settings['log_queue'] = __setting
|
||||||
return default_log_settings
|
return default_log_settings
|
||||||
|
|
||||||
def __filter_exceptions(self, record: logging.LogRecord) -> bool:
|
# def __filter_exceptions(self, record: logging.LogRecord) -> bool:
|
||||||
return record.levelname != "EXCEPTION"
|
# return record.levelname != "EXCEPTION"
|
||||||
|
|
||||||
# MARK: add a handler
|
# MARK: add a handler
|
||||||
def add_handler(
|
def add_handler(
|
||||||
@@ -239,7 +535,8 @@ class Log:
|
|||||||
|
|
||||||
# MARK: console handler
|
# MARK: console handler
|
||||||
def __create_console_handler(
|
def __create_console_handler(
|
||||||
self, log_level_console: LoggingLevel = LoggingLevel.WARNING, filter_exceptions: bool = True
|
self, handler_name: str,
|
||||||
|
log_level_console: LoggingLevel = LoggingLevel.WARNING, filter_exceptions: bool = True
|
||||||
) -> logging.StreamHandler[TextIO]:
|
) -> logging.StreamHandler[TextIO]:
|
||||||
# console logger
|
# console logger
|
||||||
if not self.validate_log_level(log_level_console):
|
if not self.validate_log_level(log_level_console):
|
||||||
@@ -259,17 +556,17 @@ class Log:
|
|||||||
formatter_console = CustomConsoleFormatter(format_string, datefmt=format_date)
|
formatter_console = CustomConsoleFormatter(format_string, datefmt=format_date)
|
||||||
else:
|
else:
|
||||||
formatter_console = logging.Formatter(format_string, datefmt=format_date)
|
formatter_console = logging.Formatter(format_string, datefmt=format_date)
|
||||||
|
console_handler.set_name(handler_name)
|
||||||
console_handler.setLevel(log_level_console.name)
|
console_handler.setLevel(log_level_console.name)
|
||||||
console_handler.set_name('console')
|
|
||||||
# do not show exceptions logs on console
|
# do not show exceptions logs on console
|
||||||
if filter_exceptions:
|
console_handler.addFilter(CustomHandlerFilter('console', filter_exceptions))
|
||||||
console_handler.addFilter(self.__filter_exceptions)
|
|
||||||
console_handler.setFormatter(formatter_console)
|
console_handler.setFormatter(formatter_console)
|
||||||
return console_handler
|
return console_handler
|
||||||
|
|
||||||
# MARK: file handler
|
# MARK: file handler
|
||||||
def __create_time_rotating_file_handler(
|
def __create_timed_rotating_file_handler(
|
||||||
self, log_level_file: LoggingLevel, log_path: Path,
|
self, handler_name: str,
|
||||||
|
log_level_file: LoggingLevel, log_path: Path,
|
||||||
when: str = "D", interval: int = 1, backup_count: int = 0
|
when: str = "D", interval: int = 1, backup_count: int = 0
|
||||||
) -> logging.handlers.TimedRotatingFileHandler:
|
) -> logging.handlers.TimedRotatingFileHandler:
|
||||||
# file logger
|
# file logger
|
||||||
@@ -302,8 +599,10 @@ class Log:
|
|||||||
),
|
),
|
||||||
datefmt="%Y-%m-%dT%H:%M:%S",
|
datefmt="%Y-%m-%dT%H:%M:%S",
|
||||||
)
|
)
|
||||||
file_handler.set_name('file_timed_rotate')
|
file_handler.set_name(handler_name)
|
||||||
file_handler.setLevel(log_level_file.name)
|
file_handler.setLevel(log_level_file.name)
|
||||||
|
# do not show errors flagged with console (they are from exceptions)
|
||||||
|
file_handler.addFilter(CustomHandlerFilter('file'))
|
||||||
file_handler.setFormatter(formatter_file_handler)
|
file_handler.setFormatter(formatter_file_handler)
|
||||||
return file_handler
|
return file_handler
|
||||||
|
|
||||||
@@ -325,6 +624,14 @@ class Log:
|
|||||||
)
|
)
|
||||||
self.listener.start()
|
self.listener.start()
|
||||||
|
|
||||||
|
def stop_listener(self):
|
||||||
|
"""
|
||||||
|
stop the listener
|
||||||
|
"""
|
||||||
|
if self.listener is not None:
|
||||||
|
self.flush()
|
||||||
|
self.listener.stop()
|
||||||
|
|
||||||
# MARK: init main log
|
# MARK: init main log
|
||||||
def __init_log(self, log_name: str) -> None:
|
def __init_log(self, log_name: str) -> None:
|
||||||
"""
|
"""
|
||||||
@@ -367,239 +674,16 @@ class Log:
|
|||||||
|
|
||||||
return root_logger
|
return root_logger
|
||||||
|
|
||||||
# FIXME: we need to add a custom formater to add stack level listing if we want to
|
def get_logger_settings(self) -> LoggerInit:
|
||||||
# Important note, although they exist, it is recommended to use self.logger.NAME directly
|
|
||||||
# so that the correct filename, method and row number is set
|
|
||||||
# for > 50 use logger.log(LoggingLevel.<LEVEL>.value, ...)
|
|
||||||
# for exception logger.log(LoggingLevel.EXCEPTION.value, ..., execInfo=True)
|
|
||||||
# MARK: log message
|
|
||||||
def log(self, level: int, msg: object, *args: object, extra: MutableMapping[str, object] | None = None):
|
|
||||||
"""log general"""
|
|
||||||
if not hasattr(self, 'logger'):
|
|
||||||
raise ValueError('Logger is not yet initialized')
|
|
||||||
if extra is None:
|
|
||||||
extra = {}
|
|
||||||
extra['stack_trace'] = traceback_call_str(start=3)
|
|
||||||
self.logger.log(level, msg, *args, extra=extra, stacklevel=2)
|
|
||||||
|
|
||||||
# MARK: DEBUG 10
|
|
||||||
def debug(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
|
||||||
"""debug"""
|
|
||||||
if not hasattr(self, 'logger'):
|
|
||||||
raise ValueError('Logger is not yet initialized')
|
|
||||||
if extra is None:
|
|
||||||
extra = {}
|
|
||||||
extra['stack_trace'] = traceback_call_str(start=3)
|
|
||||||
self.logger.debug(msg, *args, extra=extra, stacklevel=2)
|
|
||||||
|
|
||||||
# MARK: INFO 20
|
|
||||||
def info(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
|
||||||
"""info"""
|
|
||||||
if not hasattr(self, 'logger'):
|
|
||||||
raise ValueError('Logger is not yet initialized')
|
|
||||||
if extra is None:
|
|
||||||
extra = {}
|
|
||||||
extra['stack_trace'] = traceback_call_str(start=3)
|
|
||||||
self.logger.info(msg, *args, extra=extra, stacklevel=2)
|
|
||||||
|
|
||||||
# MARK: WARNING 30
|
|
||||||
def warning(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
|
||||||
"""warning"""
|
|
||||||
if not hasattr(self, 'logger'):
|
|
||||||
raise ValueError('Logger is not yet initialized')
|
|
||||||
if extra is None:
|
|
||||||
extra = {}
|
|
||||||
extra['stack_trace'] = traceback_call_str(start=3)
|
|
||||||
self.logger.warning(msg, *args, extra=extra, stacklevel=2)
|
|
||||||
|
|
||||||
# MARK: ERROR 40
|
|
||||||
def error(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
|
||||||
"""error"""
|
|
||||||
if not hasattr(self, 'logger'):
|
|
||||||
raise ValueError('Logger is not yet initialized')
|
|
||||||
if extra is None:
|
|
||||||
extra = {}
|
|
||||||
extra['stack_trace'] = traceback_call_str(start=3)
|
|
||||||
self.logger.error(msg, *args, extra=extra, stacklevel=2)
|
|
||||||
|
|
||||||
# MARK: CRITICAL 50
|
|
||||||
def critical(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
|
||||||
"""critcal"""
|
|
||||||
if not hasattr(self, 'logger'):
|
|
||||||
raise ValueError('Logger is not yet initialized')
|
|
||||||
if extra is None:
|
|
||||||
extra = {}
|
|
||||||
extra['stack_trace'] = traceback_call_str(start=3)
|
|
||||||
self.logger.critical(msg, *args, extra=extra, stacklevel=2)
|
|
||||||
|
|
||||||
# MARK: ALERT 55
|
|
||||||
def alert(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
|
||||||
"""alert"""
|
|
||||||
if not hasattr(self, 'logger'):
|
|
||||||
raise ValueError('Logger is not yet initialized')
|
|
||||||
# extra_dict = dict(extra)
|
|
||||||
if extra is None:
|
|
||||||
extra = {}
|
|
||||||
extra['stack_trace'] = traceback_call_str(start=3)
|
|
||||||
self.logger.log(LoggingLevel.ALERT.value, msg, *args, extra=extra, stacklevel=2)
|
|
||||||
|
|
||||||
# MARK: EMERGECNY: 60
|
|
||||||
def emergency(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
|
||||||
"""emergency"""
|
|
||||||
if not hasattr(self, 'logger'):
|
|
||||||
raise ValueError('Logger is not yet initialized')
|
|
||||||
if extra is None:
|
|
||||||
extra = {}
|
|
||||||
extra['stack_trace'] = traceback_call_str(start=3)
|
|
||||||
self.logger.log(LoggingLevel.EMERGENCY.value, msg, *args, extra=extra, stacklevel=2)
|
|
||||||
|
|
||||||
# MARK: EXCEPTION: 70
|
|
||||||
def exception(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
|
||||||
"""
|
"""
|
||||||
log on exceotion level, this is log.exception, but logs with a new level
|
get the logger settings we need to init the Logger class
|
||||||
|
|
||||||
Args:
|
|
||||||
msg (object): _description_
|
|
||||||
*args (object): arguments for msg
|
|
||||||
extra: Mapping[str, object] | None: extra arguments for the formatting if needed
|
|
||||||
"""
|
|
||||||
if not hasattr(self, 'logger'):
|
|
||||||
raise ValueError('Logger is not yet initialized')
|
|
||||||
if extra is None:
|
|
||||||
extra = {}
|
|
||||||
extra['stack_trace'] = traceback_call_str(start=3)
|
|
||||||
self.logger.log(LoggingLevel.EXCEPTION.value, msg, *args, exc_info=True, extra=extra, stacklevel=2)
|
|
||||||
|
|
||||||
# MARK: break line
|
|
||||||
def break_line(self, info: str = "BREAK"):
|
|
||||||
"""
|
|
||||||
add a break line as info level
|
|
||||||
|
|
||||||
Keyword Arguments:
|
|
||||||
info {str} -- _description_ (default: {"BREAK"})
|
|
||||||
"""
|
|
||||||
if not hasattr(self, 'logger'):
|
|
||||||
raise ValueError('Logger is not yet initialized')
|
|
||||||
self.logger.info("[%s] %s>", info, self.SPACER_CHAR * self.SPACER_LENGTH)
|
|
||||||
|
|
||||||
# MARK: queue handling
|
|
||||||
def flush(self, handler_name: str | None = None, timeout: float = 2.0) -> bool:
|
|
||||||
"""
|
|
||||||
Flush all pending messages
|
|
||||||
|
|
||||||
Keyword Arguments:
|
|
||||||
handler_name {str | None} -- _description_ (default: {None})
|
|
||||||
timeout {float} -- _description_ (default: {2.0})
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
bool -- _description_
|
LoggerInit -- _description_
|
||||||
"""
|
"""
|
||||||
if not self.listener or not self.log_queue:
|
return {
|
||||||
return False
|
"logger": self.logger,
|
||||||
|
"log_queue": self.log_queue
|
||||||
try:
|
}
|
||||||
# Wait for queue to be processed
|
|
||||||
start_time = time.time()
|
|
||||||
while not self.log_queue.empty() and (time.time() - start_time) < timeout:
|
|
||||||
time.sleep(0.01)
|
|
||||||
|
|
||||||
# Flush all handlers or handler given
|
|
||||||
if handler_name:
|
|
||||||
try:
|
|
||||||
self.handlers[handler_name].flush()
|
|
||||||
except IndexError:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
for handler in self.handlers.values():
|
|
||||||
handler.flush()
|
|
||||||
except OSError:
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
def stop_listener(self):
|
|
||||||
"""
|
|
||||||
stop the listener
|
|
||||||
"""
|
|
||||||
if self.listener is not None:
|
|
||||||
self.flush()
|
|
||||||
self.listener.stop()
|
|
||||||
|
|
||||||
# MARK: log level handling
|
|
||||||
def set_log_level(self, handler_name: str, log_level: LoggingLevel) -> bool:
|
|
||||||
"""
|
|
||||||
set the logging level for a handler
|
|
||||||
|
|
||||||
Arguments:
|
|
||||||
handler {str} -- _description_
|
|
||||||
log_level {LoggingLevel} -- _description_
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
bool -- _description_
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
# flush queue befoe changing logging level
|
|
||||||
self.flush(handler_name)
|
|
||||||
self.handlers[handler_name].setLevel(log_level.name)
|
|
||||||
return True
|
|
||||||
except IndexError:
|
|
||||||
if self.logger:
|
|
||||||
self.logger.error('Handler %s not found, cannot change log level', handler_name)
|
|
||||||
return False
|
|
||||||
except AttributeError:
|
|
||||||
if self.logger:
|
|
||||||
self.logger.error(
|
|
||||||
'Cannot change to log level %s for handler %s, log level invalid',
|
|
||||||
LoggingLevel.name, handler_name
|
|
||||||
)
|
|
||||||
return False
|
|
||||||
|
|
||||||
def get_log_level(self, handler_name: str) -> LoggingLevel:
|
|
||||||
"""
|
|
||||||
gettthe logging level for a handler
|
|
||||||
|
|
||||||
Arguments:
|
|
||||||
handler_name {str} -- _description_
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
LoggingLevel -- _description_
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
return self.handlers[handler_name]
|
|
||||||
except IndexError:
|
|
||||||
return LoggingLevel.NOTSET
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def validate_log_level(log_level: Any) -> bool:
|
|
||||||
"""
|
|
||||||
if the log level is invalid will return false, else return true
|
|
||||||
|
|
||||||
Args:
|
|
||||||
log_level (Any): _description_
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
bool: _description_
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
_ = LoggingLevel.from_any(log_level).value
|
|
||||||
return True
|
|
||||||
except ValueError:
|
|
||||||
return False
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def get_log_level_int(log_level: Any) -> int:
|
|
||||||
"""
|
|
||||||
Return log level as INT
|
|
||||||
If invalid returns the default log level
|
|
||||||
|
|
||||||
Arguments:
|
|
||||||
log_level {Any} -- _description_
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
int -- _description_
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
return LoggingLevel.from_any(log_level).value
|
|
||||||
except ValueError:
|
|
||||||
return LoggingLevel.from_string(Log.DEFAULT_LOG_LEVEL.name).value
|
|
||||||
|
|
||||||
# __END__
|
# __END__
|
||||||
|
|||||||
52
test-run/iterator_handling/data_search.py
Normal file
52
test-run/iterator_handling/data_search.py
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
"""
|
||||||
|
Search data tests
|
||||||
|
iterator_handling.data_search
|
||||||
|
"""
|
||||||
|
|
||||||
|
from corelibs.debug_handling.dump_data import dump_data
|
||||||
|
from corelibs.iterator_handling.data_search import find_in_array_from_list, ArraySearchList
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
"""
|
||||||
|
Comment
|
||||||
|
"""
|
||||||
|
data = [
|
||||||
|
{
|
||||||
|
"lookup_value_p": "A01",
|
||||||
|
"lookup_value_c": "B01",
|
||||||
|
"replace_value": "R01",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"lookup_value_p": "A02",
|
||||||
|
"lookup_value_c": "B02",
|
||||||
|
"replace_value": "R02",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
test_foo = ArraySearchList(
|
||||||
|
key = "lookup_value_p",
|
||||||
|
value = "A01"
|
||||||
|
)
|
||||||
|
print(test_foo)
|
||||||
|
search: list[ArraySearchList] = [
|
||||||
|
{
|
||||||
|
"key": "lookup_value_p",
|
||||||
|
"value": "A01"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"key": "lookup_value_c",
|
||||||
|
"value": "B01"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
result = find_in_array_from_list(data, search)
|
||||||
|
|
||||||
|
print(f"Search {dump_data(search)} -> {dump_data(result)}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
||||||
|
# __END__
|
||||||
106
test-run/iterator_handling/dict_helpers.py
Normal file
106
test-run/iterator_handling/dict_helpers.py
Normal file
@@ -0,0 +1,106 @@
|
|||||||
|
"""
|
||||||
|
Iterator helper testing
|
||||||
|
"""
|
||||||
|
|
||||||
|
from corelibs.debug_handling.dump_data import dump_data
|
||||||
|
from corelibs.iterator_handling.dict_helpers import mask
|
||||||
|
|
||||||
|
|
||||||
|
def __mask():
|
||||||
|
data = {
|
||||||
|
# "user": "john",
|
||||||
|
# "encryption_key": "Secret key",
|
||||||
|
# "ENCRYPTION.TEST": "Secret key test",
|
||||||
|
# "inside_password_test": "Hide this",
|
||||||
|
"password": ["secret1", "secret2"], # List value gets masked
|
||||||
|
# "config": {
|
||||||
|
# "db_password": {"primary": "secret", "backup": "secret2"}, # Dict value gets masked
|
||||||
|
# "api_keys": ["key1", "key2", "key3"] # List value gets masked
|
||||||
|
# },
|
||||||
|
# "items": [ # List value that doesn't get masked, but gets processed recursively
|
||||||
|
# {"name": "item1", "secret_key": "itemsecret"},
|
||||||
|
# {"name": "item2", "passwords": ["pass1", "pass2"]}
|
||||||
|
# ],
|
||||||
|
# "normal_list": ["item1", "item2", "item3"] # Normal list, not masked
|
||||||
|
}
|
||||||
|
data = {
|
||||||
|
"config": {
|
||||||
|
# "password": ["secret1", "secret2"],
|
||||||
|
# "password_other": {"password": ["secret1", "secret2"]},
|
||||||
|
# "database": {
|
||||||
|
# "host": "localhost",
|
||||||
|
# "password": "db_secret",
|
||||||
|
# "users": [
|
||||||
|
# {"name": "admin", "password": "admin123"},
|
||||||
|
# {"name": "user", "secret_key": "user456"}
|
||||||
|
# ]
|
||||||
|
# },
|
||||||
|
# "api": {
|
||||||
|
# # "endpoints": ["api1", "api2"],
|
||||||
|
# "encryption_settings": {
|
||||||
|
# "enabled": True,
|
||||||
|
# "secret": "api_secret"
|
||||||
|
# }
|
||||||
|
# }
|
||||||
|
"secret_key": "normal_value",
|
||||||
|
"api_key": "normal_value",
|
||||||
|
"my_key_value": "normal_value",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
data = {
|
||||||
|
"basic": {
|
||||||
|
"log_level_console": "DEBUG",
|
||||||
|
"log_level_file": "DEBUG",
|
||||||
|
"storage_interface": "sqlite",
|
||||||
|
"content_start_date": "2023-1-1",
|
||||||
|
"encryption_key": "ENCRYPTION_KEY"
|
||||||
|
},
|
||||||
|
"email": {
|
||||||
|
"alert_email": [
|
||||||
|
"test+z-sd@tequila.jp"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"poller": {
|
||||||
|
"max_forks": "1",
|
||||||
|
"interface": "Zac"
|
||||||
|
},
|
||||||
|
"pusher": {
|
||||||
|
"max_forks": "3",
|
||||||
|
"interface": "Screendragon"
|
||||||
|
},
|
||||||
|
"api:Zac": {
|
||||||
|
"type": "zac",
|
||||||
|
"client_id": "oro_zac_demo",
|
||||||
|
"client_secret": "CLIENT_SECRET",
|
||||||
|
"username": "zacuser",
|
||||||
|
"password": "ZACuser3",
|
||||||
|
"hostname": "e-gra2.zac.ai",
|
||||||
|
"appname": "e-gra2_api_trial",
|
||||||
|
"api_path": "b/api/v2"
|
||||||
|
},
|
||||||
|
"api:Screendragon": {
|
||||||
|
"type": "screendragon",
|
||||||
|
"client_id": "omniprostaging",
|
||||||
|
"encryption_client": "SOME_SECRET",
|
||||||
|
"client_encryption": "SOME_SECRET",
|
||||||
|
"secret_client": "SOME_SECRET",
|
||||||
|
"client_secret": "SOME_SECRET",
|
||||||
|
"hostname": "omniprostaging.screendragon.com",
|
||||||
|
"appname": "sdapi",
|
||||||
|
"api_path": "api"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result = mask(data)
|
||||||
|
print(f"** In: {dump_data(data)}")
|
||||||
|
print(f"===> Masked: {dump_data(result)}")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""
|
||||||
|
Test: corelibs.string_handling.string_helpers
|
||||||
|
"""
|
||||||
|
__mask()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -5,7 +5,7 @@ Log logging_handling.log testing
|
|||||||
# import atexit
|
# import atexit
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
# this is for testing only
|
# this is for testing only
|
||||||
from corelibs.logging_handling.log import Log
|
from corelibs.logging_handling.log import Log, Logger
|
||||||
from corelibs.logging_handling.logging_level_handling.logging_level import LoggingLevel
|
from corelibs.logging_handling.logging_level_handling.logging_level import LoggingLevel
|
||||||
|
|
||||||
|
|
||||||
@@ -18,16 +18,19 @@ def main():
|
|||||||
log_path=script_path.joinpath('log', 'test.log'),
|
log_path=script_path.joinpath('log', 'test.log'),
|
||||||
log_name="Test Log",
|
log_name="Test Log",
|
||||||
log_settings={
|
log_settings={
|
||||||
# "log_level_console": 'DEBUG',
|
"log_level_console": 'DEBUG',
|
||||||
"log_level_console": None,
|
# "log_level_console": None,
|
||||||
"log_level_file": 'DEBUG',
|
"log_level_file": 'DEBUG',
|
||||||
# "console_color_output_enabled": False,
|
# "console_color_output_enabled": False,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
logn = Logger(log.get_logger_settings())
|
||||||
|
|
||||||
log.logger.debug('[NORMAL] Debug test: %s', log.logger.name)
|
log.logger.debug('[NORMAL] Debug test: %s', log.logger.name)
|
||||||
log.lg.debug('[NORMAL] Debug test: %s', log.logger.name)
|
log.lg.debug('[NORMAL] Debug test: %s', log.logger.name)
|
||||||
log.debug('[NORMAL-] Debug test: %s', log.logger.name)
|
log.debug('[NORMAL-] Debug test: %s', log.logger.name)
|
||||||
|
logn.lg.debug('[NORMAL N] Debug test: %s', log.logger.name)
|
||||||
|
logn.debug('[NORMAL N-] Debug test: %s', log.logger.name)
|
||||||
log.logger.info('[NORMAL] Info test: %s', log.logger.name)
|
log.logger.info('[NORMAL] Info test: %s', log.logger.name)
|
||||||
log.info('[NORMAL-] Info test: %s', log.logger.name)
|
log.info('[NORMAL-] Info test: %s', log.logger.name)
|
||||||
log.logger.warning('[NORMAL] Warning test: %s', log.logger.name)
|
log.logger.warning('[NORMAL] Warning test: %s', log.logger.name)
|
||||||
@@ -76,10 +79,13 @@ def main():
|
|||||||
print(f"Divied: {__test}")
|
print(f"Divied: {__test}")
|
||||||
except ZeroDivisionError as e:
|
except ZeroDivisionError as e:
|
||||||
log.logger.critical("Divison through zero: %s", e)
|
log.logger.critical("Divison through zero: %s", e)
|
||||||
log.exception("Divison through zero")
|
log.exception("Divison through zero: %s", e)
|
||||||
|
|
||||||
for handler in log.logger.handlers:
|
for handler in log.logger.handlers:
|
||||||
print(f"Handler (logger) {handler} -> {handler.level} -> {LoggingLevel.from_any(handler.level)}")
|
print(
|
||||||
|
f"** Handler (logger) {handler} [{handler.name}] -> "
|
||||||
|
f"{handler.level} -> {LoggingLevel.from_any(handler.level)}"
|
||||||
|
)
|
||||||
|
|
||||||
for key, handler in log.handlers.items():
|
for key, handler in log.handlers.items():
|
||||||
print(f"Handler (handlers) [{key}] {handler} -> {handler.level} -> {LoggingLevel.from_any(handler.level)}")
|
print(f"Handler (handlers) [{key}] {handler} -> {handler.level} -> {LoggingLevel.from_any(handler.level)}")
|
||||||
|
|||||||
291
tests/unit/iterator_handling/test_dict_helpers.py
Normal file
291
tests/unit/iterator_handling/test_dict_helpers.py
Normal file
@@ -0,0 +1,291 @@
|
|||||||
|
"""
|
||||||
|
tests for corelibs.iterator_handling.dict_helpers
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from typing import Any
|
||||||
|
from corelibs.iterator_handling.dict_helpers import mask
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_default_behavior():
|
||||||
|
"""Test masking with default mask_keys"""
|
||||||
|
data = {
|
||||||
|
"username": "john_doe",
|
||||||
|
"password": "secret123",
|
||||||
|
"email": "john@example.com",
|
||||||
|
"api_secret": "abc123",
|
||||||
|
"encryption_key": "xyz789"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data)
|
||||||
|
|
||||||
|
assert result["username"] == "john_doe"
|
||||||
|
assert result["password"] == "***"
|
||||||
|
assert result["email"] == "john@example.com"
|
||||||
|
assert result["api_secret"] == "***"
|
||||||
|
assert result["encryption_key"] == "***"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_custom_keys():
|
||||||
|
"""Test masking with custom mask_keys"""
|
||||||
|
data = {
|
||||||
|
"username": "john_doe",
|
||||||
|
"token": "abc123",
|
||||||
|
"api_key": "xyz789",
|
||||||
|
"password": "secret123"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data, mask_keys=["token", "api"])
|
||||||
|
|
||||||
|
assert result["username"] == "john_doe"
|
||||||
|
assert result["token"] == "***"
|
||||||
|
assert result["api_key"] == "***"
|
||||||
|
assert result["password"] == "secret123" # Not masked with custom keys
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_custom_mask_string():
|
||||||
|
"""Test masking with custom mask string"""
|
||||||
|
data = {"password": "secret123"}
|
||||||
|
|
||||||
|
result = mask(data, mask_str="[HIDDEN]")
|
||||||
|
|
||||||
|
assert result["password"] == "[HIDDEN]"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_case_insensitive():
|
||||||
|
"""Test that masking is case insensitive"""
|
||||||
|
data = {
|
||||||
|
"PASSWORD": "secret123",
|
||||||
|
"Secret_Key": "abc123",
|
||||||
|
"ENCRYPTION_data": "xyz789"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data)
|
||||||
|
|
||||||
|
assert result["PASSWORD"] == "***"
|
||||||
|
assert result["Secret_Key"] == "***"
|
||||||
|
assert result["ENCRYPTION_data"] == "***"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_key_patterns():
|
||||||
|
"""Test different key matching patterns (start, end, contains)"""
|
||||||
|
data = {
|
||||||
|
"password_hash": "hash123", # starts with
|
||||||
|
"user_password": "secret123", # ends with
|
||||||
|
"my_secret_key": "abc123", # contains with edges
|
||||||
|
"secretvalue": "xyz789", # contains without edges
|
||||||
|
"startsecretvalue": "xyz123", # contains without edges
|
||||||
|
"normal_key": "normal_value"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data)
|
||||||
|
|
||||||
|
assert result["password_hash"] == "***"
|
||||||
|
assert result["user_password"] == "***"
|
||||||
|
assert result["my_secret_key"] == "***"
|
||||||
|
assert result["secretvalue"] == "***" # will mask beacuse starts with
|
||||||
|
assert result["startsecretvalue"] == "xyz123" # will not mask
|
||||||
|
assert result["normal_key"] == "normal_value"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_custom_edges():
|
||||||
|
"""Test masking with custom edge characters"""
|
||||||
|
data = {
|
||||||
|
"my-secret-key": "abc123",
|
||||||
|
"my_secret_key": "xyz789"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data, mask_str_edges="-")
|
||||||
|
|
||||||
|
assert result["my-secret-key"] == "***"
|
||||||
|
assert result["my_secret_key"] == "xyz789" # Underscore edges don't match
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_empty_edges():
|
||||||
|
"""Test masking with empty edge characters (substring matching)"""
|
||||||
|
data = {
|
||||||
|
"secretvalue": "abc123",
|
||||||
|
"mysecretkey": "xyz789",
|
||||||
|
"normal_key": "normal_value"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data, mask_str_edges="")
|
||||||
|
|
||||||
|
assert result["secretvalue"] == "***"
|
||||||
|
assert result["mysecretkey"] == "***"
|
||||||
|
assert result["normal_key"] == "normal_value"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_nested_dict():
|
||||||
|
"""Test masking nested dictionaries"""
|
||||||
|
data = {
|
||||||
|
"user": {
|
||||||
|
"name": "john",
|
||||||
|
"password": "secret123",
|
||||||
|
"profile": {
|
||||||
|
"email": "john@example.com",
|
||||||
|
"encryption_key": "abc123"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"api_secret": "xyz789"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data)
|
||||||
|
|
||||||
|
assert result["user"]["name"] == "john"
|
||||||
|
assert result["user"]["password"] == "***"
|
||||||
|
assert result["user"]["profile"]["email"] == "john@example.com"
|
||||||
|
assert result["user"]["profile"]["encryption_key"] == "***"
|
||||||
|
assert result["api_secret"] == "***"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_lists():
|
||||||
|
"""Test masking lists and nested structures with lists"""
|
||||||
|
data = {
|
||||||
|
"users": [
|
||||||
|
{"name": "john", "password": "secret1"},
|
||||||
|
{"name": "jane", "password": "secret2"}
|
||||||
|
],
|
||||||
|
"secrets": ["secret1", "secret2", "secret3"]
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data)
|
||||||
|
print(f"R {result['secrets']}")
|
||||||
|
|
||||||
|
assert result["users"][0]["name"] == "john"
|
||||||
|
assert result["users"][0]["password"] == "***"
|
||||||
|
assert result["users"][1]["name"] == "jane"
|
||||||
|
assert result["users"][1]["password"] == "***"
|
||||||
|
assert result["secrets"] == ["***", "***", "***"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_mixed_types():
|
||||||
|
"""Test masking with different value types"""
|
||||||
|
data = {
|
||||||
|
"password": "string_value",
|
||||||
|
"secret_number": 12345,
|
||||||
|
"encryption_flag": True,
|
||||||
|
"secret_float": 3.14,
|
||||||
|
"password_none": None,
|
||||||
|
"normal_key": "normal_value"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data)
|
||||||
|
|
||||||
|
assert result["password"] == "***"
|
||||||
|
assert result["secret_number"] == "***"
|
||||||
|
assert result["encryption_flag"] == "***"
|
||||||
|
assert result["secret_float"] == "***"
|
||||||
|
assert result["password_none"] == "***"
|
||||||
|
assert result["normal_key"] == "normal_value"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_skip_true():
|
||||||
|
"""Test that skip=True returns original data unchanged"""
|
||||||
|
data = {
|
||||||
|
"password": "secret123",
|
||||||
|
"encryption_key": "abc123",
|
||||||
|
"normal_key": "normal_value"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data, skip=True)
|
||||||
|
|
||||||
|
assert result == data
|
||||||
|
assert result is data # Should return the same object
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_empty_dict():
|
||||||
|
"""Test masking empty dictionary"""
|
||||||
|
data: dict[str, Any] = {}
|
||||||
|
|
||||||
|
result = mask(data)
|
||||||
|
|
||||||
|
assert result == {}
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_none_mask_keys():
|
||||||
|
"""Test explicit None mask_keys uses defaults"""
|
||||||
|
data = {"password": "secret123", "token": "abc123"}
|
||||||
|
|
||||||
|
result = mask(data, mask_keys=None)
|
||||||
|
|
||||||
|
assert result["password"] == "***"
|
||||||
|
assert result["token"] == "abc123" # Not in default keys
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_empty_mask_keys():
|
||||||
|
"""Test empty mask_keys list"""
|
||||||
|
data = {"password": "secret123", "secret": "abc123"}
|
||||||
|
|
||||||
|
result = mask(data, mask_keys=[])
|
||||||
|
|
||||||
|
assert result["password"] == "secret123"
|
||||||
|
assert result["secret"] == "abc123"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_complex_nested_structure():
|
||||||
|
"""Test masking complex nested structure"""
|
||||||
|
data = {
|
||||||
|
"config": {
|
||||||
|
"database": {
|
||||||
|
"host": "localhost",
|
||||||
|
"password": "db_secret",
|
||||||
|
"users": [
|
||||||
|
{"name": "admin", "password": "admin123"},
|
||||||
|
{"name": "user", "secret_key": "user456"}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"api": {
|
||||||
|
"endpoints": ["api1", "api2"],
|
||||||
|
"encryption_settings": {
|
||||||
|
"enabled": True,
|
||||||
|
"secret": "api_secret"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data)
|
||||||
|
|
||||||
|
assert result["config"]["database"]["host"] == "localhost"
|
||||||
|
assert result["config"]["database"]["password"] == "***"
|
||||||
|
assert result["config"]["database"]["users"][0]["name"] == "admin"
|
||||||
|
assert result["config"]["database"]["users"][0]["password"] == "***"
|
||||||
|
assert result["config"]["database"]["users"][1]["name"] == "user"
|
||||||
|
assert result["config"]["database"]["users"][1]["secret_key"] == "***"
|
||||||
|
assert result["config"]["api"]["endpoints"] == ["api1", "api2"]
|
||||||
|
assert result["config"]["api"]["encryption_settings"]["enabled"] is True
|
||||||
|
assert result["config"]["api"]["encryption_settings"]["secret"] == "***"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_preserves_original_data():
|
||||||
|
"""Test that original data is not modified"""
|
||||||
|
original_data = {
|
||||||
|
"password": "secret123",
|
||||||
|
"username": "john_doe"
|
||||||
|
}
|
||||||
|
data_copy = original_data.copy()
|
||||||
|
|
||||||
|
result = mask(original_data)
|
||||||
|
|
||||||
|
assert original_data == data_copy # Original unchanged
|
||||||
|
assert result != original_data # Result is different
|
||||||
|
assert result["password"] == "***"
|
||||||
|
assert original_data["password"] == "secret123"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("mask_key,expected_keys", [
|
||||||
|
(["pass"], ["password", "user_pass", "my_pass_key"]),
|
||||||
|
(["key"], ["api_key", "secret_key", "my_key_value"]),
|
||||||
|
(["token"], ["token", "auth_token", "my_token_here"]),
|
||||||
|
])
|
||||||
|
def test_mask_parametrized_keys(mask_key: list[str], expected_keys: list[str]):
|
||||||
|
"""Parametrized test for different mask key patterns"""
|
||||||
|
data = {key: "value" for key in expected_keys}
|
||||||
|
data["normal_entry"] = "normal_value"
|
||||||
|
|
||||||
|
result = mask(data, mask_keys=mask_key)
|
||||||
|
|
||||||
|
for key in expected_keys:
|
||||||
|
assert result[key] == "***"
|
||||||
|
assert result["normal_entry"] == "normal_value"
|
||||||
Reference in New Issue
Block a user