Compare commits
44 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6090995eba | ||
|
|
60db747d6d | ||
|
|
a7a4141f58 | ||
|
|
2b04cbe239 | ||
|
|
765cc061c1 | ||
|
|
80319385f0 | ||
|
|
29dd906fe0 | ||
|
|
d5dc4028c3 | ||
|
|
0df049d453 | ||
|
|
0bd7c1f685 | ||
|
|
2f08ecabbf | ||
|
|
12af1c80dc | ||
|
|
a52b6e0a55 | ||
|
|
a586cf65e2 | ||
|
|
e2e7882bfa | ||
|
|
4f9c2b9d5f | ||
|
|
5203bcf1ea | ||
|
|
f1e3bc8559 | ||
|
|
b97ca6f064 | ||
|
|
d1ea9874da | ||
|
|
3cd3f87d68 | ||
|
|
582937b866 | ||
|
|
2b8240c156 | ||
|
|
abf4b7ac89 | ||
|
|
9c49f83c16 | ||
|
|
3a625ed0ee | ||
|
|
2cfbf4bb90 | ||
|
|
5767533668 | ||
|
|
24798f19ca | ||
|
|
26f8249187 | ||
|
|
dcefa564da | ||
|
|
edd35dccea | ||
|
|
ea527ea60c | ||
|
|
fd5e1db22b | ||
|
|
39e23faf7f | ||
|
|
de285b531a | ||
|
|
0a29a592f9 | ||
|
|
e045b1d3b5 | ||
|
|
280e5fa861 | ||
|
|
472d3495b5 | ||
|
|
2778ac6870 | ||
|
|
743a0a8ac9 | ||
|
|
694712ed2e | ||
|
|
ea3b4f1790 |
6
ToDo.md
6
ToDo.md
@@ -1,5 +1,7 @@
|
|||||||
# ToDo list
|
# ToDo list
|
||||||
|
|
||||||
- [ ] stub files .pyi
|
- [x] stub files .pyi
|
||||||
- [ ] Add tests for all, we need 100% test coverate
|
- [ ] Add tests for all, we need 100% test coverate
|
||||||
- [ ] Log: add custom format for "stack_correct" if set, this will override the normal stack block
|
- [x] Log: add custom format for "stack_correct" if set, this will override the normal stack block
|
||||||
|
- [ ] Log: add rotate for size based
|
||||||
|
- [ ] All folders and file names need to be revisited for naming and content collection
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
# MARK: Project info
|
# MARK: Project info
|
||||||
[project]
|
[project]
|
||||||
name = "corelibs"
|
name = "corelibs"
|
||||||
version = "0.13.1"
|
version = "0.22.3"
|
||||||
description = "Collection of utils for Python scripts"
|
description = "Collection of utils for Python scripts"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
requires-python = ">=3.13"
|
requires-python = ">=3.13"
|
||||||
|
|||||||
@@ -19,19 +19,19 @@ def compile_re(reg: str) -> re.Pattern[str]:
|
|||||||
|
|
||||||
|
|
||||||
# email regex
|
# email regex
|
||||||
EMAIL_BASIC_REGEX = r"""
|
EMAIL_BASIC_REGEX: str = r"""
|
||||||
^[A-Za-z0-9!#$%&'*+\-\/=?^_`{|}~][A-Za-z0-9!#$%:\(\)&'*+\-\/=?^_`{|}~\.]{0,63}
|
^[A-Za-z0-9!#$%&'*+\-\/=?^_`{|}~][A-Za-z0-9!#$%:\(\)&'*+\-\/=?^_`{|}~\.]{0,63}
|
||||||
@(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[a-zA-Z]{2,6}$
|
@(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[a-zA-Z]{2,6}$
|
||||||
"""
|
"""
|
||||||
# Domain regex with localhost
|
# Domain regex with localhost
|
||||||
DOMAIN_WITH_LOCALHOST_REGEX = r"""
|
DOMAIN_WITH_LOCALHOST_REGEX: str = r"""
|
||||||
^(?:localhost|(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[A-Za-z]{2,})$
|
^(?:localhost|(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[A-Za-z]{2,})$
|
||||||
"""
|
"""
|
||||||
# domain regex with loclhost and optional port
|
# domain regex with loclhost and optional port
|
||||||
DOMAIN_WITH_LOCALHOST_PORT_REGEX = r"""
|
DOMAIN_WITH_LOCALHOST_PORT_REGEX: str = r"""
|
||||||
^(?:localhost|(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[A-Za-z]{2,})(?::\d+)?$
|
^(?:localhost|(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[A-Za-z]{2,})(?::\d+)?$
|
||||||
"""
|
"""
|
||||||
# Domain, no localhost
|
# Domain, no localhost
|
||||||
DOMAIN_REGEX = r"^(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[A-Za-z]{2,}$"
|
DOMAIN_REGEX: str = r"^(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[A-Za-z]{2,}$"
|
||||||
|
|
||||||
# __END__
|
# __END__
|
||||||
|
|||||||
@@ -4,30 +4,72 @@ Various debug helpers
|
|||||||
|
|
||||||
import traceback
|
import traceback
|
||||||
import os
|
import os
|
||||||
|
import sys
|
||||||
|
from typing import Tuple, Type
|
||||||
|
from types import TracebackType
|
||||||
|
|
||||||
|
# _typeshed.OptExcInfo
|
||||||
|
OptExcInfo = Tuple[None, None, None] | Tuple[Type[BaseException], BaseException, TracebackType]
|
||||||
|
|
||||||
def traceback_call_str(start: int = 2, depth: int = 1):
|
def call_stack(
|
||||||
|
start: int = 0,
|
||||||
|
skip_last: int = -1,
|
||||||
|
separator: str = ' -> ',
|
||||||
|
reset_start_if_empty: bool = False
|
||||||
|
) -> str:
|
||||||
"""
|
"""
|
||||||
get the trace for the last entry
|
get the trace for the last entry
|
||||||
|
|
||||||
Keyword Arguments:
|
Keyword Arguments:
|
||||||
start {int} -- _description_ (default: {2})
|
start {int} -- start, if too might output will empty until reset_start_if_empty is set (default: {0})
|
||||||
depth {int} -- _description_ (default: {1})
|
skip_last {int} -- how many of the last are skipped, defaults to -1 for current method (default: {-1})
|
||||||
|
seperator {str} -- add stack separator, if empty defaults to ' -> ' (default: { -> })
|
||||||
|
reset_start_if_empty {bool} -- if no stack returned because of too high start,
|
||||||
|
reset to 0 for full read (default: {False})
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
_type_ -- _description_
|
str -- _description_
|
||||||
"""
|
"""
|
||||||
# can't have more than in the stack for depth
|
# stack = traceback.extract_stack()[start:depth]
|
||||||
depth = min(depth, start)
|
# how many of the last entries we skip (so we do not get self), default is -1
|
||||||
depth = start - depth
|
# start cannot be negative
|
||||||
# 0 is full stack length from start
|
if skip_last > 0:
|
||||||
if depth == 0:
|
skip_last = skip_last * -1
|
||||||
stack = traceback.extract_stack()[-start:]
|
stack = traceback.extract_stack()
|
||||||
|
__stack = stack[start:skip_last]
|
||||||
|
# start possible to high, reset start to 0
|
||||||
|
if not __stack and reset_start_if_empty:
|
||||||
|
start = 0
|
||||||
|
__stack = stack[start:skip_last]
|
||||||
|
if not separator:
|
||||||
|
separator = ' -> '
|
||||||
|
# print(f"* HERE: {dump_data(stack)}")
|
||||||
|
return f"{separator}".join(f"{os.path.basename(f.filename)}:{f.name}:{f.lineno}" for f in __stack)
|
||||||
|
|
||||||
|
|
||||||
|
def exception_stack(
|
||||||
|
exc_stack: OptExcInfo | None = None,
|
||||||
|
separator: str = ' -> '
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
Exception traceback, if no sys.exc_info is set, run internal
|
||||||
|
|
||||||
|
Keyword Arguments:
|
||||||
|
exc_stack {OptExcInfo | None} -- _description_ (default: {None})
|
||||||
|
separator {str} -- _description_ (default: {' -> '})
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str -- _description_
|
||||||
|
"""
|
||||||
|
if exc_stack is not None:
|
||||||
|
_, _, exc_traceback = exc_stack
|
||||||
else:
|
else:
|
||||||
stack = traceback.extract_stack()[-start:-depth]
|
exc_traceback = None
|
||||||
return ' -> '.join(
|
_, _, exc_traceback = sys.exc_info()
|
||||||
f"{os.path.basename(f.filename)}:{f.name}:{f.lineno}"
|
stack = traceback.extract_tb(exc_traceback)
|
||||||
for f in stack
|
if not separator:
|
||||||
)
|
separator = ' -> '
|
||||||
|
# print(f"* HERE: {dump_data(stack)}")
|
||||||
|
return f"{separator}".join(f"{os.path.basename(f.filename)}:{f.name}:{f.lineno}" for f in stack)
|
||||||
|
|
||||||
# __END__
|
# __END__
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import json
|
|||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
def dump_data(data: dict[Any, Any] | list[Any] | str | None) -> str:
|
def dump_data(data: Any) -> str:
|
||||||
"""
|
"""
|
||||||
dump formated output from dict/list
|
dump formated output from dict/list
|
||||||
|
|
||||||
|
|||||||
23
src/corelibs/exceptions/csv_exceptions.py
Normal file
23
src/corelibs/exceptions/csv_exceptions.py
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
"""
|
||||||
|
Exceptions for csv file reading and processing
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class NoCsvReader(Exception):
|
||||||
|
"""
|
||||||
|
CSV reader is none
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class CsvHeaderDataMissing(Exception):
|
||||||
|
"""
|
||||||
|
The csv reader returned None as headers, the header column in the csv file is missing
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class CompulsoryCsvHeaderCheckFailed(Exception):
|
||||||
|
"""
|
||||||
|
raise if the header is not matching to the excpeted values
|
||||||
|
"""
|
||||||
|
|
||||||
|
# __END__
|
||||||
@@ -2,23 +2,40 @@
|
|||||||
wrapper around search path
|
wrapper around search path
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from typing import Any
|
from typing import Any, TypedDict, NotRequired
|
||||||
|
from warnings import deprecated
|
||||||
|
|
||||||
|
|
||||||
|
class ArraySearchList(TypedDict):
|
||||||
|
"""find in array from list search dict"""
|
||||||
|
key: str
|
||||||
|
value: str | bool | int | float | list[str | None]
|
||||||
|
case_sensitive: NotRequired[bool]
|
||||||
|
|
||||||
|
|
||||||
|
@deprecated("Use find_in_array_from_list()")
|
||||||
def array_search(
|
def array_search(
|
||||||
search_params: list[dict[str, str | bool | list[str | None]]],
|
search_params: list[ArraySearchList],
|
||||||
data: list[dict[str, Any]],
|
data: list[dict[str, Any]],
|
||||||
return_index: bool = False
|
return_index: bool = False
|
||||||
|
) -> list[dict[str, Any]]:
|
||||||
|
"""depreacted, old call order"""
|
||||||
|
return find_in_array_from_list(data, search_params, return_index)
|
||||||
|
|
||||||
|
def find_in_array_from_list(
|
||||||
|
data: list[dict[str, Any]],
|
||||||
|
search_params: list[ArraySearchList],
|
||||||
|
return_index: bool = False
|
||||||
) -> list[dict[str, Any]]:
|
) -> list[dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
search in an array of dicts with an array of Key/Value set
|
search in an list of dicts with an list of Key/Value set
|
||||||
all Key/Value sets must match
|
all Key/Value sets must match
|
||||||
Value set can be list for OR match
|
Value set can be list for OR match
|
||||||
option: case_senstive: default True
|
option: case_senstive: default True
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
search_params (list): List of search params in "Key"/"Value" lists with options
|
|
||||||
data (list): data to search in, must be a list
|
data (list): data to search in, must be a list
|
||||||
|
search_params (list): List of search params in "key"/"value" lists with options
|
||||||
return_index (bool): return index of list [default False]
|
return_index (bool): return index of list [default False]
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
@@ -32,18 +49,20 @@ def array_search(
|
|||||||
"""
|
"""
|
||||||
if not isinstance(search_params, list): # type: ignore
|
if not isinstance(search_params, list): # type: ignore
|
||||||
raise ValueError("search_params must be a list")
|
raise ValueError("search_params must be a list")
|
||||||
keys = []
|
keys: list[str] = []
|
||||||
|
# check that key and value exist and are set
|
||||||
for search in search_params:
|
for search in search_params:
|
||||||
if not search.get('Key') or not search.get('Value'):
|
if not search.get('key') or not search.get('value'):
|
||||||
raise KeyError(
|
raise KeyError(
|
||||||
f"Either Key '{search.get('Key', '')}' or "
|
f"Either Key '{search.get('key', '')}' or "
|
||||||
f"Value '{search.get('Value', '')}' is missing or empty"
|
f"Value '{search.get('value', '')}' is missing or empty"
|
||||||
)
|
)
|
||||||
# if double key -> abort
|
# if double key -> abort
|
||||||
if search.get("Key") in keys:
|
if search.get("key") in keys:
|
||||||
raise KeyError(
|
raise KeyError(
|
||||||
f"Key {search.get('Key', '')} already exists in search_params"
|
f"Key {search.get('key', '')} already exists in search_params"
|
||||||
)
|
)
|
||||||
|
keys.append(str(search['key']))
|
||||||
|
|
||||||
return_items: list[dict[str, Any]] = []
|
return_items: list[dict[str, Any]] = []
|
||||||
for si_idx, search_item in enumerate(data):
|
for si_idx, search_item in enumerate(data):
|
||||||
@@ -55,20 +74,20 @@ def array_search(
|
|||||||
# lower case left side
|
# lower case left side
|
||||||
# TODO: allow nested Keys. eg "Key: ["Key a", "key b"]" to be ["Key a"]["key b"]
|
# TODO: allow nested Keys. eg "Key: ["Key a", "key b"]" to be ["Key a"]["key b"]
|
||||||
if search.get("case_sensitive", True) is False:
|
if search.get("case_sensitive", True) is False:
|
||||||
search_value = search_item.get(str(search['Key']), "").lower()
|
search_value = search_item.get(str(search['key']), "").lower()
|
||||||
else:
|
else:
|
||||||
search_value = search_item.get(str(search['Key']), "")
|
search_value = search_item.get(str(search['key']), "")
|
||||||
# lower case right side
|
# lower case right side
|
||||||
if isinstance(search['Value'], list):
|
if isinstance(search['value'], list):
|
||||||
search_in = [
|
search_in = [
|
||||||
str(k).lower()
|
str(k).lower()
|
||||||
if search.get("case_sensitive", True) is False else k
|
if search.get("case_sensitive", True) is False else k
|
||||||
for k in search['Value']
|
for k in search['value']
|
||||||
]
|
]
|
||||||
elif search.get("case_sensitive", True) is False:
|
elif search.get("case_sensitive", True) is False:
|
||||||
search_in = str(search['Value']).lower()
|
search_in = str(search['value']).lower()
|
||||||
else:
|
else:
|
||||||
search_in = search['Value']
|
search_in = search['value']
|
||||||
# compare check
|
# compare check
|
||||||
if (
|
if (
|
||||||
(
|
(
|
||||||
|
|||||||
@@ -3,26 +3,36 @@ Dict helpers
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
from typing import Any
|
from typing import TypeAlias, Union, Dict, List, Any, cast
|
||||||
|
|
||||||
|
# definitions for the mask run below
|
||||||
|
MaskableValue: TypeAlias = Union[str, int, float, bool, None]
|
||||||
|
NestedDict: TypeAlias = Dict[str, Union[MaskableValue, List[Any], 'NestedDict']]
|
||||||
|
ProcessableValue: TypeAlias = Union[MaskableValue, List[Any], NestedDict]
|
||||||
|
|
||||||
|
|
||||||
def mask(
|
def mask(
|
||||||
data_set: dict[str, str],
|
data_set: dict[str, Any],
|
||||||
mask_keys: list[str] | None = None,
|
mask_keys: list[str] | None = None,
|
||||||
mask_str: str = "***",
|
mask_str: str = "***",
|
||||||
|
mask_str_edges: str = '_',
|
||||||
skip: bool = False
|
skip: bool = False
|
||||||
) -> dict[str, str]:
|
) -> dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
mask data for output
|
mask data for output
|
||||||
Checks if mask_keys list exist in any key in the data set either from the start or at the end
|
Checks if mask_keys list exist in any key in the data set either from the start or at the end
|
||||||
|
|
||||||
|
Use the mask_str_edges to define how searches inside a string should work. Default it must start
|
||||||
|
and end with '_', remove to search string in string
|
||||||
|
|
||||||
Arguments:
|
Arguments:
|
||||||
data_set {dict[str, str]} -- _description_
|
data_set {dict[str, str]} -- _description_
|
||||||
|
|
||||||
Keyword Arguments:
|
Keyword Arguments:
|
||||||
mask_keys {list[str] | None} -- _description_ (default: {None})
|
mask_keys {list[str] | None} -- _description_ (default: {None})
|
||||||
mask_str {str} -- _description_ (default: {"***"})
|
mask_str {str} -- _description_ (default: {"***"})
|
||||||
skip {bool} -- _description_ (default: {False})
|
mask_str_edges {str} -- _description_ (default: {"_"})
|
||||||
|
skip {bool} -- if set to true skip (default: {False})
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
dict[str, str] -- _description_
|
dict[str, str] -- _description_
|
||||||
@@ -30,29 +40,46 @@ def mask(
|
|||||||
if skip is True:
|
if skip is True:
|
||||||
return data_set
|
return data_set
|
||||||
if mask_keys is None:
|
if mask_keys is None:
|
||||||
mask_keys = ["password", "secret"]
|
mask_keys = ["encryption", "password", "secret"]
|
||||||
|
else:
|
||||||
|
# make sure it is lower case
|
||||||
|
mask_keys = [mask_key.lower() for mask_key in mask_keys]
|
||||||
|
|
||||||
|
def should_mask_key(key: str) -> bool:
|
||||||
|
"""Check if a key should be masked"""
|
||||||
|
__key_lower = key.lower()
|
||||||
|
return any(
|
||||||
|
__key_lower.startswith(mask_key) or
|
||||||
|
__key_lower.endswith(mask_key) or
|
||||||
|
f"{mask_str_edges}{mask_key}{mask_str_edges}" in __key_lower
|
||||||
|
for mask_key in mask_keys
|
||||||
|
)
|
||||||
|
|
||||||
|
def mask_recursive(obj: ProcessableValue) -> ProcessableValue:
|
||||||
|
"""Recursively mask values in nested structures"""
|
||||||
|
if isinstance(obj, dict):
|
||||||
|
return {
|
||||||
|
key: mask_value(value) if should_mask_key(key) else mask_recursive(value)
|
||||||
|
for key, value in obj.items()
|
||||||
|
}
|
||||||
|
if isinstance(obj, list):
|
||||||
|
return [mask_recursive(item) for item in obj]
|
||||||
|
return obj
|
||||||
|
|
||||||
|
def mask_value(value: Any) -> Any:
|
||||||
|
"""Handle masking based on value type"""
|
||||||
|
if isinstance(value, list):
|
||||||
|
# Mask each individual value in the list
|
||||||
|
return [mask_str for _ in cast('list[Any]', value)]
|
||||||
|
if isinstance(value, dict):
|
||||||
|
# Recursively process the dictionary instead of masking the whole thing
|
||||||
|
return mask_recursive(cast('ProcessableValue', value))
|
||||||
|
# Mask primitive values
|
||||||
|
return mask_str
|
||||||
|
|
||||||
return {
|
return {
|
||||||
key: mask_str
|
key: mask_value(value) if should_mask_key(key) else mask_recursive(value)
|
||||||
if any(key.startswith(mask_key) or key.endswith(mask_key) for mask_key in mask_keys) else value
|
|
||||||
for key, value in data_set.items()
|
for key, value in data_set.items()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def set_entry(dict_set: dict[str, Any], key: str, value_set: Any) -> dict[str, Any]:
|
|
||||||
"""
|
|
||||||
set a new entry in the dict set
|
|
||||||
|
|
||||||
Arguments:
|
|
||||||
key {str} -- _description_
|
|
||||||
dict_set {dict[str, Any]} -- _description_
|
|
||||||
value_set {Any} -- _description_
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
dict[str, Any] -- _description_
|
|
||||||
"""
|
|
||||||
if not dict_set.get(key):
|
|
||||||
dict_set[key] = {}
|
|
||||||
dict_set[key] = value_set
|
|
||||||
return dict_set
|
|
||||||
|
|
||||||
# __END__
|
# __END__
|
||||||
|
|||||||
@@ -32,4 +32,6 @@ def jmespath_search(search_data: dict[Any, Any] | list[Any], search_params: str)
|
|||||||
raise ValueError(f"Type error for search_params: {excp}") from excp
|
raise ValueError(f"Type error for search_params: {excp}") from excp
|
||||||
return search_result
|
return search_result
|
||||||
|
|
||||||
|
# TODO: compile jmespath setup
|
||||||
|
|
||||||
# __END__
|
# __END__
|
||||||
|
|||||||
@@ -7,12 +7,14 @@ attach "init_worker_logging" with the set log_queue
|
|||||||
import re
|
import re
|
||||||
import logging.handlers
|
import logging.handlers
|
||||||
import logging
|
import logging
|
||||||
|
from datetime import datetime
|
||||||
import time
|
import time
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
import atexit
|
||||||
from typing import MutableMapping, TextIO, TypedDict, Any, TYPE_CHECKING, cast
|
from typing import MutableMapping, TextIO, TypedDict, Any, TYPE_CHECKING, cast
|
||||||
from corelibs.logging_handling.logging_level_handling.logging_level import LoggingLevel
|
from corelibs.logging_handling.logging_level_handling.logging_level import LoggingLevel
|
||||||
from corelibs.string_handling.text_colors import Colors
|
from corelibs.string_handling.text_colors import Colors
|
||||||
from corelibs.debug_handling.debug_helpers import traceback_call_str
|
from corelibs.debug_handling.debug_helpers import call_stack, exception_stack
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from multiprocessing import Queue
|
from multiprocessing import Queue
|
||||||
@@ -20,14 +22,10 @@ if TYPE_CHECKING:
|
|||||||
|
|
||||||
# MARK: Log settings TypedDict
|
# MARK: Log settings TypedDict
|
||||||
class LogSettings(TypedDict):
|
class LogSettings(TypedDict):
|
||||||
"""
|
"""log settings, for Log setup"""
|
||||||
log settings
|
|
||||||
|
|
||||||
Arguments:
|
|
||||||
TypedDict {_type_} -- _description_
|
|
||||||
"""
|
|
||||||
log_level_console: LoggingLevel
|
log_level_console: LoggingLevel
|
||||||
log_level_file: LoggingLevel
|
log_level_file: LoggingLevel
|
||||||
|
per_run_log: bool
|
||||||
console_enabled: bool
|
console_enabled: bool
|
||||||
console_color_output_enabled: bool
|
console_color_output_enabled: bool
|
||||||
add_start_info: bool
|
add_start_info: bool
|
||||||
@@ -35,6 +33,12 @@ class LogSettings(TypedDict):
|
|||||||
log_queue: 'Queue[str] | None'
|
log_queue: 'Queue[str] | None'
|
||||||
|
|
||||||
|
|
||||||
|
class LoggerInit(TypedDict):
|
||||||
|
"""for Logger init"""
|
||||||
|
logger: logging.Logger
|
||||||
|
log_queue: 'Queue[str] | None'
|
||||||
|
|
||||||
|
|
||||||
# MARK: Custom color filter
|
# MARK: Custom color filter
|
||||||
class CustomConsoleFormatter(logging.Formatter):
|
class CustomConsoleFormatter(logging.Formatter):
|
||||||
"""
|
"""
|
||||||
@@ -73,13 +77,318 @@ class CustomConsoleFormatter(logging.Formatter):
|
|||||||
message = super().format(record)
|
message = super().format(record)
|
||||||
return f"{color}{message}{reset}"
|
return f"{color}{message}{reset}"
|
||||||
|
|
||||||
|
|
||||||
# TODO: add custom handlers for stack_trace, if not set fill with %(filename)s:%(funcName)s:%(lineno)d
|
# TODO: add custom handlers for stack_trace, if not set fill with %(filename)s:%(funcName)s:%(lineno)d
|
||||||
# hasattr(record, 'stack_trace')
|
# hasattr(record, 'stack_trace')
|
||||||
|
# also for something like "context" where we add an array of anything to a message
|
||||||
|
|
||||||
|
|
||||||
# MARK: Log class
|
class CustomHandlerFilter(logging.Filter):
|
||||||
class Log:
|
"""
|
||||||
|
Add a custom handler for filtering
|
||||||
|
"""
|
||||||
|
HANDLER_NAME_FILTER_EXCEPTION: str = 'console'
|
||||||
|
|
||||||
|
def __init__(self, handler_name: str, filter_exceptions: bool = False):
|
||||||
|
super().__init__(name=handler_name)
|
||||||
|
self.handler_name = handler_name
|
||||||
|
self.filter_exceptions = filter_exceptions
|
||||||
|
|
||||||
|
def filter(self, record: logging.LogRecord) -> bool:
|
||||||
|
# if console and exception do not show
|
||||||
|
if self.handler_name == self.HANDLER_NAME_FILTER_EXCEPTION and self.filter_exceptions:
|
||||||
|
return record.levelname != "EXCEPTION"
|
||||||
|
# if cnosole entry is true and traget file filter
|
||||||
|
if hasattr(record, 'console') and getattr(record, 'console') is True and self.handler_name == 'file':
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
# def __filter_exceptions(self, record: logging.LogRecord) -> bool:
|
||||||
|
# return record.levelname != "EXCEPTION"
|
||||||
|
|
||||||
|
|
||||||
|
# MARK: Parent class
|
||||||
|
class LogParent:
|
||||||
|
"""
|
||||||
|
Parent class with general methods
|
||||||
|
used by Log and Logger
|
||||||
|
"""
|
||||||
|
|
||||||
|
# spacer lenght characters and the character
|
||||||
|
SPACER_CHAR: str = '='
|
||||||
|
SPACER_LENGTH: int = 32
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.logger: logging.Logger
|
||||||
|
self.log_queue: 'Queue[str] | None' = None
|
||||||
|
self.handlers: dict[str, Any] = {}
|
||||||
|
|
||||||
|
# FIXME: we need to add a custom formater to add stack level listing if we want to
|
||||||
|
# Important note, although they exist, it is recommended to use self.logger.NAME directly
|
||||||
|
# so that the correct filename, method and row number is set
|
||||||
|
# for > 50 use logger.log(LoggingLevel.<LEVEL>.value, ...)
|
||||||
|
# for exception logger.log(LoggingLevel.EXCEPTION.value, ..., execInfo=True)
|
||||||
|
# MARK: log message
|
||||||
|
def log(self, level: int, msg: object, *args: object, extra: MutableMapping[str, object] | None = None):
|
||||||
|
"""log general"""
|
||||||
|
if not hasattr(self, 'logger'):
|
||||||
|
raise ValueError('Logger is not yet initialized')
|
||||||
|
if extra is None:
|
||||||
|
extra = {}
|
||||||
|
extra['stack_trace'] = call_stack(skip_last=2)
|
||||||
|
self.logger.log(level, msg, *args, extra=extra, stacklevel=2)
|
||||||
|
|
||||||
|
# MARK: DEBUG 10
|
||||||
|
def debug(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
||||||
|
"""debug"""
|
||||||
|
if not hasattr(self, 'logger'):
|
||||||
|
raise ValueError('Logger is not yet initialized')
|
||||||
|
if extra is None:
|
||||||
|
extra = {}
|
||||||
|
extra['stack_trace'] = call_stack(skip_last=2)
|
||||||
|
self.logger.debug(msg, *args, extra=extra, stacklevel=2)
|
||||||
|
|
||||||
|
# MARK: INFO 20
|
||||||
|
def info(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
||||||
|
"""info"""
|
||||||
|
if not hasattr(self, 'logger'):
|
||||||
|
raise ValueError('Logger is not yet initialized')
|
||||||
|
if extra is None:
|
||||||
|
extra = {}
|
||||||
|
extra['stack_trace'] = call_stack(skip_last=2)
|
||||||
|
self.logger.info(msg, *args, extra=extra, stacklevel=2)
|
||||||
|
|
||||||
|
# MARK: WARNING 30
|
||||||
|
def warning(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
||||||
|
"""warning"""
|
||||||
|
if not hasattr(self, 'logger'):
|
||||||
|
raise ValueError('Logger is not yet initialized')
|
||||||
|
if extra is None:
|
||||||
|
extra = {}
|
||||||
|
extra['stack_trace'] = call_stack(skip_last=2)
|
||||||
|
self.logger.warning(msg, *args, extra=extra, stacklevel=2)
|
||||||
|
|
||||||
|
# MARK: ERROR 40
|
||||||
|
def error(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
||||||
|
"""error"""
|
||||||
|
if not hasattr(self, 'logger'):
|
||||||
|
raise ValueError('Logger is not yet initialized')
|
||||||
|
if extra is None:
|
||||||
|
extra = {}
|
||||||
|
extra['stack_trace'] = call_stack(skip_last=2)
|
||||||
|
self.logger.error(msg, *args, extra=extra, stacklevel=2)
|
||||||
|
|
||||||
|
# MARK: CRITICAL 50
|
||||||
|
def critical(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
||||||
|
"""critcal"""
|
||||||
|
if not hasattr(self, 'logger'):
|
||||||
|
raise ValueError('Logger is not yet initialized')
|
||||||
|
if extra is None:
|
||||||
|
extra = {}
|
||||||
|
extra['stack_trace'] = call_stack(skip_last=2)
|
||||||
|
self.logger.critical(msg, *args, extra=extra, stacklevel=2)
|
||||||
|
|
||||||
|
# MARK: ALERT 55
|
||||||
|
def alert(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
||||||
|
"""alert"""
|
||||||
|
if not hasattr(self, 'logger'):
|
||||||
|
raise ValueError('Logger is not yet initialized')
|
||||||
|
# extra_dict = dict(extra)
|
||||||
|
if extra is None:
|
||||||
|
extra = {}
|
||||||
|
extra['stack_trace'] = call_stack(skip_last=2)
|
||||||
|
self.logger.log(LoggingLevel.ALERT.value, msg, *args, extra=extra, stacklevel=2)
|
||||||
|
|
||||||
|
# MARK: EMERGECNY: 60
|
||||||
|
def emergency(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
||||||
|
"""emergency"""
|
||||||
|
if not hasattr(self, 'logger'):
|
||||||
|
raise ValueError('Logger is not yet initialized')
|
||||||
|
if extra is None:
|
||||||
|
extra = {}
|
||||||
|
extra['stack_trace'] = call_stack(skip_last=2)
|
||||||
|
self.logger.log(LoggingLevel.EMERGENCY.value, msg, *args, extra=extra, stacklevel=2)
|
||||||
|
|
||||||
|
# MARK: EXCEPTION: 70
|
||||||
|
def exception(
|
||||||
|
self,
|
||||||
|
msg: object, *args: object, extra: MutableMapping[str, object] | None = None,
|
||||||
|
log_error: bool = True
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
log on exceotion level, this is log.exception, but logs with a new level
|
||||||
|
|
||||||
|
Args:
|
||||||
|
msg (object): _description_
|
||||||
|
*args (object): arguments for msg
|
||||||
|
extra: Mapping[str, object] | None: extra arguments for the formatting if needed
|
||||||
|
log_error: (bool): If set to false will not write additional error message for console (Default True)
|
||||||
|
"""
|
||||||
|
if not hasattr(self, 'logger'):
|
||||||
|
raise ValueError('Logger is not yet initialized')
|
||||||
|
if extra is None:
|
||||||
|
extra = {}
|
||||||
|
extra['stack_trace'] = call_stack(skip_last=2)
|
||||||
|
extra['exception_trace'] = exception_stack()
|
||||||
|
# write to console first with extra flag for filtering in file
|
||||||
|
if log_error:
|
||||||
|
self.logger.log(
|
||||||
|
LoggingLevel.ERROR.value,
|
||||||
|
f"<=EXCEPTION={extra['exception_trace']}> {msg} [{extra['stack_trace']}]",
|
||||||
|
*args, extra=dict(extra) | {'console': True}, stacklevel=2
|
||||||
|
)
|
||||||
|
self.logger.log(LoggingLevel.EXCEPTION.value, msg, *args, exc_info=True, extra=extra, stacklevel=2)
|
||||||
|
|
||||||
|
def break_line(self, info: str = "BREAK"):
|
||||||
|
"""
|
||||||
|
add a break line as info level
|
||||||
|
|
||||||
|
Keyword Arguments:
|
||||||
|
info {str} -- _description_ (default: {"BREAK"})
|
||||||
|
"""
|
||||||
|
if not hasattr(self, 'logger'):
|
||||||
|
raise ValueError('Logger is not yet initialized')
|
||||||
|
self.logger.info("[%s] %s>", info, self.SPACER_CHAR * self.SPACER_LENGTH)
|
||||||
|
|
||||||
|
# MARK: queue handling
|
||||||
|
def flush(self, handler_name: str | None = None, timeout: float = 2.0) -> bool:
|
||||||
|
"""
|
||||||
|
Flush all pending messages
|
||||||
|
|
||||||
|
Keyword Arguments:
|
||||||
|
handler_name {str | None} -- _description_ (default: {None})
|
||||||
|
timeout {float} -- _description_ (default: {2.0})
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool -- _description_
|
||||||
|
"""
|
||||||
|
if not self.log_queue:
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Wait for queue to be processed
|
||||||
|
start_time = time.time()
|
||||||
|
while not self.log_queue.empty() and (time.time() - start_time) < timeout:
|
||||||
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
# Flush all handlers or handler given
|
||||||
|
if handler_name:
|
||||||
|
try:
|
||||||
|
self.handlers[handler_name].flush()
|
||||||
|
except IndexError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
for handler in self.handlers.values():
|
||||||
|
handler.flush()
|
||||||
|
except OSError:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _cleanup(self):
|
||||||
|
"""cleanup for any open queues in case we have an abort"""
|
||||||
|
if not self.log_queue:
|
||||||
|
return
|
||||||
|
self.flush()
|
||||||
|
# Close the queue properly
|
||||||
|
self.log_queue.close()
|
||||||
|
self.log_queue.join_thread()
|
||||||
|
|
||||||
|
# MARK: log level handling
|
||||||
|
def set_log_level(self, handler_name: str, log_level: LoggingLevel) -> bool:
|
||||||
|
"""
|
||||||
|
set the logging level for a handler
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
handler {str} -- _description_
|
||||||
|
log_level {LoggingLevel} -- _description_
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool -- _description_
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# flush queue befoe changing logging level
|
||||||
|
self.flush(handler_name)
|
||||||
|
self.handlers[handler_name].setLevel(log_level.name)
|
||||||
|
return True
|
||||||
|
except IndexError:
|
||||||
|
if self.logger:
|
||||||
|
self.logger.error('Handler %s not found, cannot change log level', handler_name)
|
||||||
|
return False
|
||||||
|
except AttributeError:
|
||||||
|
if self.logger:
|
||||||
|
self.logger.error(
|
||||||
|
'Cannot change to log level %s for handler %s, log level invalid',
|
||||||
|
LoggingLevel.name, handler_name
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get_log_level(self, handler_name: str) -> LoggingLevel:
|
||||||
|
"""
|
||||||
|
gettthe logging level for a handler
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
handler_name {str} -- _description_
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
LoggingLevel -- _description_
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
return LoggingLevel.from_any(self.handlers[handler_name].level)
|
||||||
|
except IndexError:
|
||||||
|
return LoggingLevel.NOTSET
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def validate_log_level(log_level: Any) -> bool:
|
||||||
|
"""
|
||||||
|
if the log level is invalid will return false, else return true
|
||||||
|
|
||||||
|
Args:
|
||||||
|
log_level (Any): _description_
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: _description_
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
_ = LoggingLevel.from_any(log_level).value
|
||||||
|
return True
|
||||||
|
except ValueError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_log_level_int(log_level: Any) -> int:
|
||||||
|
"""
|
||||||
|
Return log level as INT
|
||||||
|
If invalid returns the default log level
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
log_level {Any} -- _description_
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int -- _description_
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
return LoggingLevel.from_any(log_level).value
|
||||||
|
except ValueError:
|
||||||
|
return LoggingLevel.from_string(Log.DEFAULT_LOG_LEVEL.name).value
|
||||||
|
|
||||||
|
|
||||||
|
# MARK: Logger
|
||||||
|
class Logger(LogParent):
|
||||||
|
"""
|
||||||
|
The class we can pass on to other clases without re-init the class itself
|
||||||
|
NOTE: if no queue object is handled over the logging level change might not take immediate effect
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, logger_settings: LoggerInit):
|
||||||
|
LogParent.__init__(self)
|
||||||
|
self.logger = logger_settings['logger']
|
||||||
|
self.lg = self.logger
|
||||||
|
self.l = self.logger
|
||||||
|
self.handlers = {str(_handler.name): _handler for _handler in self.logger.handlers}
|
||||||
|
self.log_queue = logger_settings['log_queue']
|
||||||
|
|
||||||
|
|
||||||
|
# MARK: LogSetup class
|
||||||
|
class Log(LogParent):
|
||||||
"""
|
"""
|
||||||
logger setup
|
logger setup
|
||||||
"""
|
"""
|
||||||
@@ -93,8 +402,9 @@ class Log:
|
|||||||
DEFAULT_LOG_LEVEL_CONSOLE: LoggingLevel = LoggingLevel.WARNING
|
DEFAULT_LOG_LEVEL_CONSOLE: LoggingLevel = LoggingLevel.WARNING
|
||||||
# default settings
|
# default settings
|
||||||
DEFAULT_LOG_SETTINGS: LogSettings = {
|
DEFAULT_LOG_SETTINGS: LogSettings = {
|
||||||
"log_level_console": LoggingLevel.WARNING,
|
"log_level_console": DEFAULT_LOG_LEVEL_CONSOLE,
|
||||||
"log_level_file": LoggingLevel.DEBUG,
|
"log_level_file": DEFAULT_LOG_LEVEL_FILE,
|
||||||
|
"per_run_log": False,
|
||||||
"console_enabled": True,
|
"console_enabled": True,
|
||||||
"console_color_output_enabled": True,
|
"console_color_output_enabled": True,
|
||||||
"add_start_info": True,
|
"add_start_info": True,
|
||||||
@@ -110,6 +420,7 @@ class Log:
|
|||||||
log_settings: dict[str, 'LoggingLevel | str | bool | None | Queue[str]'] | LogSettings | None = None,
|
log_settings: dict[str, 'LoggingLevel | str | bool | None | Queue[str]'] | LogSettings | None = None,
|
||||||
other_handlers: dict[str, Any] | None = None
|
other_handlers: dict[str, Any] | None = None
|
||||||
):
|
):
|
||||||
|
LogParent.__init__(self)
|
||||||
# add new level for alert, emergecny and exception
|
# add new level for alert, emergecny and exception
|
||||||
logging.addLevelName(LoggingLevel.ALERT.value, LoggingLevel.ALERT.name)
|
logging.addLevelName(LoggingLevel.ALERT.value, LoggingLevel.ALERT.name)
|
||||||
logging.addLevelName(LoggingLevel.EMERGENCY.value, LoggingLevel.EMERGENCY.name)
|
logging.addLevelName(LoggingLevel.EMERGENCY.value, LoggingLevel.EMERGENCY.name)
|
||||||
@@ -142,13 +453,13 @@ class Log:
|
|||||||
# in the file writer too, for the ones where color is set BEFORE the format
|
# in the file writer too, for the ones where color is set BEFORE the format
|
||||||
# Any is logging.StreamHandler, logging.FileHandler and all logging.handlers.*
|
# Any is logging.StreamHandler, logging.FileHandler and all logging.handlers.*
|
||||||
self.handlers: dict[str, Any] = {}
|
self.handlers: dict[str, Any] = {}
|
||||||
self.add_handler('file_handler', self.__create_time_rotating_file_handler(
|
self.add_handler('file_handler', self.__create_file_handler(
|
||||||
self.log_settings['log_level_file'], log_path)
|
'file_handler', self.log_settings['log_level_file'], log_path)
|
||||||
)
|
)
|
||||||
if self.log_settings['console_enabled']:
|
if self.log_settings['console_enabled']:
|
||||||
# console
|
# console
|
||||||
self.add_handler('stream_handler', self.__create_console_handler(
|
self.add_handler('stream_handler', self.__create_console_handler(
|
||||||
self.log_settings['log_level_console'])
|
'stream_handler', self.log_settings['log_level_console'])
|
||||||
)
|
)
|
||||||
# add other handlers,
|
# add other handlers,
|
||||||
if other_handlers is not None:
|
if other_handlers is not None:
|
||||||
@@ -194,6 +505,7 @@ class Log:
|
|||||||
default_log_settings[__log_entry] = LoggingLevel.from_any(__log_level)
|
default_log_settings[__log_entry] = LoggingLevel.from_any(__log_level)
|
||||||
# check bool
|
# check bool
|
||||||
for __log_entry in [
|
for __log_entry in [
|
||||||
|
"per_run_log",
|
||||||
"console_enabled",
|
"console_enabled",
|
||||||
"console_color_output_enabled",
|
"console_color_output_enabled",
|
||||||
"add_start_info",
|
"add_start_info",
|
||||||
@@ -211,8 +523,8 @@ class Log:
|
|||||||
default_log_settings['log_queue'] = __setting
|
default_log_settings['log_queue'] = __setting
|
||||||
return default_log_settings
|
return default_log_settings
|
||||||
|
|
||||||
def __filter_exceptions(self, record: logging.LogRecord) -> bool:
|
# def __filter_exceptions(self, record: logging.LogRecord) -> bool:
|
||||||
return record.levelname != "EXCEPTION"
|
# return record.levelname != "EXCEPTION"
|
||||||
|
|
||||||
# MARK: add a handler
|
# MARK: add a handler
|
||||||
def add_handler(
|
def add_handler(
|
||||||
@@ -239,7 +551,8 @@ class Log:
|
|||||||
|
|
||||||
# MARK: console handler
|
# MARK: console handler
|
||||||
def __create_console_handler(
|
def __create_console_handler(
|
||||||
self, log_level_console: LoggingLevel = LoggingLevel.WARNING, filter_exceptions: bool = True
|
self, handler_name: str,
|
||||||
|
log_level_console: LoggingLevel = LoggingLevel.WARNING, filter_exceptions: bool = True
|
||||||
) -> logging.StreamHandler[TextIO]:
|
) -> logging.StreamHandler[TextIO]:
|
||||||
# console logger
|
# console logger
|
||||||
if not self.validate_log_level(log_level_console):
|
if not self.validate_log_level(log_level_console):
|
||||||
@@ -259,32 +572,43 @@ class Log:
|
|||||||
formatter_console = CustomConsoleFormatter(format_string, datefmt=format_date)
|
formatter_console = CustomConsoleFormatter(format_string, datefmt=format_date)
|
||||||
else:
|
else:
|
||||||
formatter_console = logging.Formatter(format_string, datefmt=format_date)
|
formatter_console = logging.Formatter(format_string, datefmt=format_date)
|
||||||
|
console_handler.set_name(handler_name)
|
||||||
console_handler.setLevel(log_level_console.name)
|
console_handler.setLevel(log_level_console.name)
|
||||||
console_handler.set_name('console')
|
|
||||||
# do not show exceptions logs on console
|
# do not show exceptions logs on console
|
||||||
if filter_exceptions:
|
console_handler.addFilter(CustomHandlerFilter('console', filter_exceptions))
|
||||||
console_handler.addFilter(self.__filter_exceptions)
|
|
||||||
console_handler.setFormatter(formatter_console)
|
console_handler.setFormatter(formatter_console)
|
||||||
return console_handler
|
return console_handler
|
||||||
|
|
||||||
# MARK: file handler
|
# MARK: file handler
|
||||||
def __create_time_rotating_file_handler(
|
def __create_file_handler(
|
||||||
self, log_level_file: LoggingLevel, log_path: Path,
|
self, handler_name: str,
|
||||||
|
log_level_file: LoggingLevel, log_path: Path,
|
||||||
|
# for TimedRotating, if per_run_log is off
|
||||||
when: str = "D", interval: int = 1, backup_count: int = 0
|
when: str = "D", interval: int = 1, backup_count: int = 0
|
||||||
) -> logging.handlers.TimedRotatingFileHandler:
|
) -> logging.handlers.TimedRotatingFileHandler | logging.FileHandler:
|
||||||
# file logger
|
# file logger
|
||||||
# when: S/M/H/D/W0-W6/midnight
|
# when: S/M/H/D/W0-W6/midnight
|
||||||
# interval: how many, 1D = every day
|
# interval: how many, 1D = every day
|
||||||
# backup_count: how many old to keep, 0 = all
|
# backup_count: how many old to keep, 0 = all
|
||||||
if not self.validate_log_level(log_level_file):
|
if not self.validate_log_level(log_level_file):
|
||||||
log_level_file = self.DEFAULT_LOG_LEVEL_FILE
|
log_level_file = self.DEFAULT_LOG_LEVEL_FILE
|
||||||
file_handler = logging.handlers.TimedRotatingFileHandler(
|
if self.log_settings['per_run_log']:
|
||||||
filename=log_path,
|
# log path, remove them stem (".log"), then add the datetime and add .log again
|
||||||
encoding="utf-8",
|
now = datetime.now()
|
||||||
when=when,
|
# we add microseconds part to get milli seconds
|
||||||
interval=interval,
|
new_stem=f"{log_path.stem}.{now.strftime('%Y-%m-%d_%H-%M-%S')}.{str(now.microsecond)[:3]}"
|
||||||
backupCount=backup_count
|
file_handler = logging.FileHandler(
|
||||||
)
|
filename=log_path.with_name(f"{new_stem}{log_path.suffix}"),
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
file_handler = logging.handlers.TimedRotatingFileHandler(
|
||||||
|
filename=log_path,
|
||||||
|
encoding="utf-8",
|
||||||
|
when=when,
|
||||||
|
interval=interval,
|
||||||
|
backupCount=backup_count
|
||||||
|
)
|
||||||
formatter_file_handler = logging.Formatter(
|
formatter_file_handler = logging.Formatter(
|
||||||
(
|
(
|
||||||
# time stamp
|
# time stamp
|
||||||
@@ -302,8 +626,10 @@ class Log:
|
|||||||
),
|
),
|
||||||
datefmt="%Y-%m-%dT%H:%M:%S",
|
datefmt="%Y-%m-%dT%H:%M:%S",
|
||||||
)
|
)
|
||||||
file_handler.set_name('file_timed_rotate')
|
file_handler.set_name(handler_name)
|
||||||
file_handler.setLevel(log_level_file.name)
|
file_handler.setLevel(log_level_file.name)
|
||||||
|
# do not show errors flagged with console (they are from exceptions)
|
||||||
|
file_handler.addFilter(CustomHandlerFilter('file'))
|
||||||
file_handler.setFormatter(formatter_file_handler)
|
file_handler.setFormatter(formatter_file_handler)
|
||||||
return file_handler
|
return file_handler
|
||||||
|
|
||||||
@@ -318,6 +644,7 @@ class Log:
|
|||||||
if log_queue is None:
|
if log_queue is None:
|
||||||
return
|
return
|
||||||
self.log_queue = log_queue
|
self.log_queue = log_queue
|
||||||
|
atexit.register(self.stop_listener)
|
||||||
self.listener = logging.handlers.QueueListener(
|
self.listener = logging.handlers.QueueListener(
|
||||||
self.log_queue,
|
self.log_queue,
|
||||||
*self.handlers.values(),
|
*self.handlers.values(),
|
||||||
@@ -325,6 +652,15 @@ class Log:
|
|||||||
)
|
)
|
||||||
self.listener.start()
|
self.listener.start()
|
||||||
|
|
||||||
|
def stop_listener(self):
|
||||||
|
"""
|
||||||
|
stop the listener
|
||||||
|
"""
|
||||||
|
if self.listener is not None:
|
||||||
|
self.flush()
|
||||||
|
self.listener.stop()
|
||||||
|
self._cleanup()
|
||||||
|
|
||||||
# MARK: init main log
|
# MARK: init main log
|
||||||
def __init_log(self, log_name: str) -> None:
|
def __init_log(self, log_name: str) -> None:
|
||||||
"""
|
"""
|
||||||
@@ -367,239 +703,16 @@ class Log:
|
|||||||
|
|
||||||
return root_logger
|
return root_logger
|
||||||
|
|
||||||
# FIXME: we need to add a custom formater to add stack level listing if we want to
|
def get_logger_settings(self) -> LoggerInit:
|
||||||
# Important note, although they exist, it is recommended to use self.logger.NAME directly
|
|
||||||
# so that the correct filename, method and row number is set
|
|
||||||
# for > 50 use logger.log(LoggingLevel.<LEVEL>.value, ...)
|
|
||||||
# for exception logger.log(LoggingLevel.EXCEPTION.value, ..., execInfo=True)
|
|
||||||
# MARK: log message
|
|
||||||
def log(self, level: int, msg: object, *args: object, extra: MutableMapping[str, object] | None = None):
|
|
||||||
"""log general"""
|
|
||||||
if not hasattr(self, 'logger'):
|
|
||||||
raise ValueError('Logger is not yet initialized')
|
|
||||||
if extra is None:
|
|
||||||
extra = {}
|
|
||||||
extra['stack_trace'] = traceback_call_str(start=3)
|
|
||||||
self.logger.log(level, msg, *args, extra=extra, stacklevel=2)
|
|
||||||
|
|
||||||
# MARK: DEBUG 10
|
|
||||||
def debug(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
|
||||||
"""debug"""
|
|
||||||
if not hasattr(self, 'logger'):
|
|
||||||
raise ValueError('Logger is not yet initialized')
|
|
||||||
if extra is None:
|
|
||||||
extra = {}
|
|
||||||
extra['stack_trace'] = traceback_call_str(start=3)
|
|
||||||
self.logger.debug(msg, *args, extra=extra, stacklevel=2)
|
|
||||||
|
|
||||||
# MARK: INFO 20
|
|
||||||
def info(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
|
||||||
"""info"""
|
|
||||||
if not hasattr(self, 'logger'):
|
|
||||||
raise ValueError('Logger is not yet initialized')
|
|
||||||
if extra is None:
|
|
||||||
extra = {}
|
|
||||||
extra['stack_trace'] = traceback_call_str(start=3)
|
|
||||||
self.logger.info(msg, *args, extra=extra, stacklevel=2)
|
|
||||||
|
|
||||||
# MARK: WARNING 30
|
|
||||||
def warning(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
|
||||||
"""warning"""
|
|
||||||
if not hasattr(self, 'logger'):
|
|
||||||
raise ValueError('Logger is not yet initialized')
|
|
||||||
if extra is None:
|
|
||||||
extra = {}
|
|
||||||
extra['stack_trace'] = traceback_call_str(start=3)
|
|
||||||
self.logger.warning(msg, *args, extra=extra, stacklevel=2)
|
|
||||||
|
|
||||||
# MARK: ERROR 40
|
|
||||||
def error(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
|
||||||
"""error"""
|
|
||||||
if not hasattr(self, 'logger'):
|
|
||||||
raise ValueError('Logger is not yet initialized')
|
|
||||||
if extra is None:
|
|
||||||
extra = {}
|
|
||||||
extra['stack_trace'] = traceback_call_str(start=3)
|
|
||||||
self.logger.error(msg, *args, extra=extra, stacklevel=2)
|
|
||||||
|
|
||||||
# MARK: CRITICAL 50
|
|
||||||
def critical(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
|
||||||
"""critcal"""
|
|
||||||
if not hasattr(self, 'logger'):
|
|
||||||
raise ValueError('Logger is not yet initialized')
|
|
||||||
if extra is None:
|
|
||||||
extra = {}
|
|
||||||
extra['stack_trace'] = traceback_call_str(start=3)
|
|
||||||
self.logger.critical(msg, *args, extra=extra, stacklevel=2)
|
|
||||||
|
|
||||||
# MARK: ALERT 55
|
|
||||||
def alert(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
|
||||||
"""alert"""
|
|
||||||
if not hasattr(self, 'logger'):
|
|
||||||
raise ValueError('Logger is not yet initialized')
|
|
||||||
# extra_dict = dict(extra)
|
|
||||||
if extra is None:
|
|
||||||
extra = {}
|
|
||||||
extra['stack_trace'] = traceback_call_str(start=3)
|
|
||||||
self.logger.log(LoggingLevel.ALERT.value, msg, *args, extra=extra, stacklevel=2)
|
|
||||||
|
|
||||||
# MARK: EMERGECNY: 60
|
|
||||||
def emergency(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
|
||||||
"""emergency"""
|
|
||||||
if not hasattr(self, 'logger'):
|
|
||||||
raise ValueError('Logger is not yet initialized')
|
|
||||||
if extra is None:
|
|
||||||
extra = {}
|
|
||||||
extra['stack_trace'] = traceback_call_str(start=3)
|
|
||||||
self.logger.log(LoggingLevel.EMERGENCY.value, msg, *args, extra=extra, stacklevel=2)
|
|
||||||
|
|
||||||
# MARK: EXCEPTION: 70
|
|
||||||
def exception(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
|
||||||
"""
|
"""
|
||||||
log on exceotion level, this is log.exception, but logs with a new level
|
get the logger settings we need to init the Logger class
|
||||||
|
|
||||||
Args:
|
|
||||||
msg (object): _description_
|
|
||||||
*args (object): arguments for msg
|
|
||||||
extra: Mapping[str, object] | None: extra arguments for the formatting if needed
|
|
||||||
"""
|
|
||||||
if not hasattr(self, 'logger'):
|
|
||||||
raise ValueError('Logger is not yet initialized')
|
|
||||||
if extra is None:
|
|
||||||
extra = {}
|
|
||||||
extra['stack_trace'] = traceback_call_str(start=3)
|
|
||||||
self.logger.log(LoggingLevel.EXCEPTION.value, msg, *args, exc_info=True, extra=extra, stacklevel=2)
|
|
||||||
|
|
||||||
# MARK: break line
|
|
||||||
def break_line(self, info: str = "BREAK"):
|
|
||||||
"""
|
|
||||||
add a break line as info level
|
|
||||||
|
|
||||||
Keyword Arguments:
|
|
||||||
info {str} -- _description_ (default: {"BREAK"})
|
|
||||||
"""
|
|
||||||
if not hasattr(self, 'logger'):
|
|
||||||
raise ValueError('Logger is not yet initialized')
|
|
||||||
self.logger.info("[%s] %s>", info, self.SPACER_CHAR * self.SPACER_LENGTH)
|
|
||||||
|
|
||||||
# MARK: queue handling
|
|
||||||
def flush(self, handler_name: str | None = None, timeout: float = 2.0) -> bool:
|
|
||||||
"""
|
|
||||||
Flush all pending messages
|
|
||||||
|
|
||||||
Keyword Arguments:
|
|
||||||
handler_name {str | None} -- _description_ (default: {None})
|
|
||||||
timeout {float} -- _description_ (default: {2.0})
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
bool -- _description_
|
LoggerInit -- _description_
|
||||||
"""
|
"""
|
||||||
if not self.listener or not self.log_queue:
|
return {
|
||||||
return False
|
"logger": self.logger,
|
||||||
|
"log_queue": self.log_queue
|
||||||
try:
|
}
|
||||||
# Wait for queue to be processed
|
|
||||||
start_time = time.time()
|
|
||||||
while not self.log_queue.empty() and (time.time() - start_time) < timeout:
|
|
||||||
time.sleep(0.01)
|
|
||||||
|
|
||||||
# Flush all handlers or handler given
|
|
||||||
if handler_name:
|
|
||||||
try:
|
|
||||||
self.handlers[handler_name].flush()
|
|
||||||
except IndexError:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
for handler in self.handlers.values():
|
|
||||||
handler.flush()
|
|
||||||
except OSError:
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
def stop_listener(self):
|
|
||||||
"""
|
|
||||||
stop the listener
|
|
||||||
"""
|
|
||||||
if self.listener is not None:
|
|
||||||
self.flush()
|
|
||||||
self.listener.stop()
|
|
||||||
|
|
||||||
# MARK: log level handling
|
|
||||||
def set_log_level(self, handler_name: str, log_level: LoggingLevel) -> bool:
|
|
||||||
"""
|
|
||||||
set the logging level for a handler
|
|
||||||
|
|
||||||
Arguments:
|
|
||||||
handler {str} -- _description_
|
|
||||||
log_level {LoggingLevel} -- _description_
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
bool -- _description_
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
# flush queue befoe changing logging level
|
|
||||||
self.flush(handler_name)
|
|
||||||
self.handlers[handler_name].setLevel(log_level.name)
|
|
||||||
return True
|
|
||||||
except IndexError:
|
|
||||||
if self.logger:
|
|
||||||
self.logger.error('Handler %s not found, cannot change log level', handler_name)
|
|
||||||
return False
|
|
||||||
except AttributeError:
|
|
||||||
if self.logger:
|
|
||||||
self.logger.error(
|
|
||||||
'Cannot change to log level %s for handler %s, log level invalid',
|
|
||||||
LoggingLevel.name, handler_name
|
|
||||||
)
|
|
||||||
return False
|
|
||||||
|
|
||||||
def get_log_level(self, handler_name: str) -> LoggingLevel:
|
|
||||||
"""
|
|
||||||
gettthe logging level for a handler
|
|
||||||
|
|
||||||
Arguments:
|
|
||||||
handler_name {str} -- _description_
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
LoggingLevel -- _description_
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
return self.handlers[handler_name]
|
|
||||||
except IndexError:
|
|
||||||
return LoggingLevel.NOTSET
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def validate_log_level(log_level: Any) -> bool:
|
|
||||||
"""
|
|
||||||
if the log level is invalid will return false, else return true
|
|
||||||
|
|
||||||
Args:
|
|
||||||
log_level (Any): _description_
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
bool: _description_
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
_ = LoggingLevel.from_any(log_level).value
|
|
||||||
return True
|
|
||||||
except ValueError:
|
|
||||||
return False
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def get_log_level_int(log_level: Any) -> int:
|
|
||||||
"""
|
|
||||||
Return log level as INT
|
|
||||||
If invalid returns the default log level
|
|
||||||
|
|
||||||
Arguments:
|
|
||||||
log_level {Any} -- _description_
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
int -- _description_
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
return LoggingLevel.from_any(log_level).value
|
|
||||||
except ValueError:
|
|
||||||
return LoggingLevel.from_string(Log.DEFAULT_LOG_LEVEL.name).value
|
|
||||||
|
|
||||||
# __END__
|
# __END__
|
||||||
|
|||||||
20
src/corelibs/requests_handling/auth_helpers.py
Normal file
20
src/corelibs/requests_handling/auth_helpers.py
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
"""
|
||||||
|
Various HTTP auth helpers
|
||||||
|
"""
|
||||||
|
|
||||||
|
from base64 import b64encode
|
||||||
|
|
||||||
|
|
||||||
|
def basic_auth(username: str, password: str) -> str:
|
||||||
|
"""
|
||||||
|
setup basic auth, for debug
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
username {str} -- _description_
|
||||||
|
password {str} -- _description_
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str -- _description_
|
||||||
|
"""
|
||||||
|
token = b64encode(f"{username}:{password}".encode('utf-8')).decode("ascii")
|
||||||
|
return f'Basic {token}'
|
||||||
@@ -2,8 +2,18 @@
|
|||||||
Current timestamp strings and time zones
|
Current timestamp strings and time zones
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import re
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
|
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
|
||||||
|
from corelibs.var_handling.var_helpers import is_float
|
||||||
|
|
||||||
|
|
||||||
|
class TimeParseError(Exception):
|
||||||
|
"""Custom exception for time parsing errors."""
|
||||||
|
|
||||||
|
|
||||||
|
class TimeUnitError(Exception):
|
||||||
|
"""Custom exception for time parsing errors."""
|
||||||
|
|
||||||
|
|
||||||
class TimestampStrings:
|
class TimestampStrings:
|
||||||
@@ -24,3 +34,79 @@ class TimestampStrings:
|
|||||||
self.timestamp = self.timestamp_now.strftime("%Y-%m-%d %H:%M:%S")
|
self.timestamp = self.timestamp_now.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
self.timestamp_tz = self.timestamp_now_tz.strftime("%Y-%m-%d %H:%M:%S %Z")
|
self.timestamp_tz = self.timestamp_now_tz.strftime("%Y-%m-%d %H:%M:%S %Z")
|
||||||
self.timestamp_file = self.timestamp_now.strftime("%Y-%m-%d_%H%M%S")
|
self.timestamp_file = self.timestamp_now.strftime("%Y-%m-%d_%H%M%S")
|
||||||
|
|
||||||
|
|
||||||
|
def convert_to_seconds(time_string: str | int | float) -> int:
|
||||||
|
"""
|
||||||
|
Conver a string with time units into a seconds string
|
||||||
|
The following units are allowed
|
||||||
|
Y: 365 days
|
||||||
|
M: 30 days
|
||||||
|
d, h, m, s
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
time_string {str} -- _description_
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: _description_
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int -- _description_
|
||||||
|
"""
|
||||||
|
|
||||||
|
# skip out if this is a number of any type
|
||||||
|
# numbers will br made float, rounded and then converted to int
|
||||||
|
if is_float(time_string):
|
||||||
|
return int(round(float(time_string)))
|
||||||
|
time_string = str(time_string)
|
||||||
|
|
||||||
|
# Define time unit conversion factors
|
||||||
|
unit_factors: dict[str, int] = {
|
||||||
|
'Y': 31536000, # 365 days * 86400 seconds/day
|
||||||
|
'M': 2592000 * 12, # 1 year in seconds (assuming 365 days per year)
|
||||||
|
'd': 86400, # 1 day in seconds
|
||||||
|
'h': 3600, # 1 hour in seconds
|
||||||
|
'm': 60, # minutes to seconds
|
||||||
|
's': 1 # 1 second in seconds
|
||||||
|
}
|
||||||
|
long_unit_names: dict[str, str] = {
|
||||||
|
'year': 'Y',
|
||||||
|
'years': 'Y',
|
||||||
|
'month': 'M',
|
||||||
|
'months': 'M',
|
||||||
|
'day': 'd',
|
||||||
|
'days': 'd',
|
||||||
|
'hour': 'h',
|
||||||
|
'hours': 'h',
|
||||||
|
'minute': 'm',
|
||||||
|
'minutes': 'm',
|
||||||
|
'min': 'm',
|
||||||
|
'second': 's',
|
||||||
|
'seconds': 's',
|
||||||
|
'sec': 's',
|
||||||
|
}
|
||||||
|
|
||||||
|
total_seconds = 0
|
||||||
|
|
||||||
|
seen_units: list[str] = [] # Track units that have been encountered
|
||||||
|
|
||||||
|
# Use regex to match number and time unit pairs
|
||||||
|
for match in re.finditer(r'(\d+)\s*([a-zA-Z]+)', time_string):
|
||||||
|
value, unit = int(match.group(1)), match.group(2)
|
||||||
|
|
||||||
|
# full name check, fallback to original name
|
||||||
|
unit = long_unit_names.get(unit.lower(), unit)
|
||||||
|
|
||||||
|
# Check for duplicate units
|
||||||
|
if unit in seen_units:
|
||||||
|
raise TimeParseError(f"Unit '{unit}' appears more than once.")
|
||||||
|
# Check invalid unit
|
||||||
|
if unit not in unit_factors:
|
||||||
|
raise TimeUnitError(f"Unit '{unit}' is not a valid unit name.")
|
||||||
|
# Add to total seconds based on the units
|
||||||
|
if unit in unit_factors:
|
||||||
|
total_seconds += value * unit_factors[unit]
|
||||||
|
|
||||||
|
seen_units.append(unit)
|
||||||
|
|
||||||
|
return total_seconds
|
||||||
|
|||||||
52
test-run/iterator_handling/data_search.py
Normal file
52
test-run/iterator_handling/data_search.py
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
"""
|
||||||
|
Search data tests
|
||||||
|
iterator_handling.data_search
|
||||||
|
"""
|
||||||
|
|
||||||
|
from corelibs.debug_handling.dump_data import dump_data
|
||||||
|
from corelibs.iterator_handling.data_search import find_in_array_from_list, ArraySearchList
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
"""
|
||||||
|
Comment
|
||||||
|
"""
|
||||||
|
data = [
|
||||||
|
{
|
||||||
|
"lookup_value_p": "A01",
|
||||||
|
"lookup_value_c": "B01",
|
||||||
|
"replace_value": "R01",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"lookup_value_p": "A02",
|
||||||
|
"lookup_value_c": "B02",
|
||||||
|
"replace_value": "R02",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
test_foo = ArraySearchList(
|
||||||
|
key = "lookup_value_p",
|
||||||
|
value = "A01"
|
||||||
|
)
|
||||||
|
print(test_foo)
|
||||||
|
search: list[ArraySearchList] = [
|
||||||
|
{
|
||||||
|
"key": "lookup_value_p",
|
||||||
|
"value": "A01"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"key": "lookup_value_c",
|
||||||
|
"value": "B01"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
result = find_in_array_from_list(data, search)
|
||||||
|
|
||||||
|
print(f"Search {dump_data(search)} -> {dump_data(result)}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
||||||
|
# __END__
|
||||||
106
test-run/iterator_handling/dict_helpers.py
Normal file
106
test-run/iterator_handling/dict_helpers.py
Normal file
@@ -0,0 +1,106 @@
|
|||||||
|
"""
|
||||||
|
Iterator helper testing
|
||||||
|
"""
|
||||||
|
|
||||||
|
from corelibs.debug_handling.dump_data import dump_data
|
||||||
|
from corelibs.iterator_handling.dict_helpers import mask
|
||||||
|
|
||||||
|
|
||||||
|
def __mask():
|
||||||
|
data = {
|
||||||
|
# "user": "john",
|
||||||
|
# "encryption_key": "Secret key",
|
||||||
|
# "ENCRYPTION.TEST": "Secret key test",
|
||||||
|
# "inside_password_test": "Hide this",
|
||||||
|
"password": ["secret1", "secret2"], # List value gets masked
|
||||||
|
# "config": {
|
||||||
|
# "db_password": {"primary": "secret", "backup": "secret2"}, # Dict value gets masked
|
||||||
|
# "api_keys": ["key1", "key2", "key3"] # List value gets masked
|
||||||
|
# },
|
||||||
|
# "items": [ # List value that doesn't get masked, but gets processed recursively
|
||||||
|
# {"name": "item1", "secret_key": "itemsecret"},
|
||||||
|
# {"name": "item2", "passwords": ["pass1", "pass2"]}
|
||||||
|
# ],
|
||||||
|
# "normal_list": ["item1", "item2", "item3"] # Normal list, not masked
|
||||||
|
}
|
||||||
|
data = {
|
||||||
|
"config": {
|
||||||
|
# "password": ["secret1", "secret2"],
|
||||||
|
# "password_other": {"password": ["secret1", "secret2"]},
|
||||||
|
# "database": {
|
||||||
|
# "host": "localhost",
|
||||||
|
# "password": "db_secret",
|
||||||
|
# "users": [
|
||||||
|
# {"name": "admin", "password": "admin123"},
|
||||||
|
# {"name": "user", "secret_key": "user456"}
|
||||||
|
# ]
|
||||||
|
# },
|
||||||
|
# "api": {
|
||||||
|
# # "endpoints": ["api1", "api2"],
|
||||||
|
# "encryption_settings": {
|
||||||
|
# "enabled": True,
|
||||||
|
# "secret": "api_secret"
|
||||||
|
# }
|
||||||
|
# }
|
||||||
|
"secret_key": "normal_value",
|
||||||
|
"api_key": "normal_value",
|
||||||
|
"my_key_value": "normal_value",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
data = {
|
||||||
|
"basic": {
|
||||||
|
"log_level_console": "DEBUG",
|
||||||
|
"log_level_file": "DEBUG",
|
||||||
|
"storage_interface": "sqlite",
|
||||||
|
"content_start_date": "2023-1-1",
|
||||||
|
"encryption_key": "ENCRYPTION_KEY"
|
||||||
|
},
|
||||||
|
"email": {
|
||||||
|
"alert_email": [
|
||||||
|
"test+z-sd@tequila.jp"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"poller": {
|
||||||
|
"max_forks": "1",
|
||||||
|
"interface": "Zac"
|
||||||
|
},
|
||||||
|
"pusher": {
|
||||||
|
"max_forks": "3",
|
||||||
|
"interface": "Screendragon"
|
||||||
|
},
|
||||||
|
"api:Zac": {
|
||||||
|
"type": "zac",
|
||||||
|
"client_id": "oro_zac_demo",
|
||||||
|
"client_secret": "CLIENT_SECRET",
|
||||||
|
"username": "zacuser",
|
||||||
|
"password": "ZACuser3",
|
||||||
|
"hostname": "e-gra2.zac.ai",
|
||||||
|
"appname": "e-gra2_api_trial",
|
||||||
|
"api_path": "b/api/v2"
|
||||||
|
},
|
||||||
|
"api:Screendragon": {
|
||||||
|
"type": "screendragon",
|
||||||
|
"client_id": "omniprostaging",
|
||||||
|
"encryption_client": "SOME_SECRET",
|
||||||
|
"client_encryption": "SOME_SECRET",
|
||||||
|
"secret_client": "SOME_SECRET",
|
||||||
|
"client_secret": "SOME_SECRET",
|
||||||
|
"hostname": "omniprostaging.screendragon.com",
|
||||||
|
"appname": "sdapi",
|
||||||
|
"api_path": "api"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result = mask(data)
|
||||||
|
print(f"** In: {dump_data(data)}")
|
||||||
|
print(f"===> Masked: {dump_data(result)}")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""
|
||||||
|
Test: corelibs.string_handling.string_helpers
|
||||||
|
"""
|
||||||
|
__mask()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
52
test-run/json_handling/jmespath_helper.py
Normal file
52
test-run/json_handling/jmespath_helper.py
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
"""
|
||||||
|
jmes path testing
|
||||||
|
"""
|
||||||
|
|
||||||
|
from corelibs.debug_handling.dump_data import dump_data
|
||||||
|
from corelibs.json_handling.jmespath_helper import jmespath_search
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
"""
|
||||||
|
Comment
|
||||||
|
"""
|
||||||
|
__set = {
|
||||||
|
'a': 'b',
|
||||||
|
'foobar': [1, 2, 'a'],
|
||||||
|
'bar': {
|
||||||
|
'a': 1,
|
||||||
|
'b': 'c'
|
||||||
|
},
|
||||||
|
'baz': [
|
||||||
|
{
|
||||||
|
'aa': 1,
|
||||||
|
'ab': 'cc'
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'ba': 2,
|
||||||
|
'bb': 'dd'
|
||||||
|
},
|
||||||
|
],
|
||||||
|
'foo': {
|
||||||
|
'a': [1, 2, 3],
|
||||||
|
'b': ['a', 'b', 'c']
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
__get = [
|
||||||
|
'a',
|
||||||
|
'bar.a',
|
||||||
|
'foo.a',
|
||||||
|
'baz[].aa'
|
||||||
|
]
|
||||||
|
for __jmespath in __get:
|
||||||
|
result = jmespath_search(__set, __jmespath)
|
||||||
|
print(f"GET {__jmespath}: {dump_data(result)}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
||||||
|
# __END__
|
||||||
@@ -3,9 +3,11 @@ Log logging_handling.log testing
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
# import atexit
|
# import atexit
|
||||||
|
import sys
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
# this is for testing only
|
# this is for testing only
|
||||||
from corelibs.logging_handling.log import Log
|
from corelibs.logging_handling.log import Log, Logger
|
||||||
|
from corelibs.debug_handling.debug_helpers import exception_stack, call_stack
|
||||||
from corelibs.logging_handling.logging_level_handling.logging_level import LoggingLevel
|
from corelibs.logging_handling.logging_level_handling.logging_level import LoggingLevel
|
||||||
|
|
||||||
|
|
||||||
@@ -18,16 +20,20 @@ def main():
|
|||||||
log_path=script_path.joinpath('log', 'test.log'),
|
log_path=script_path.joinpath('log', 'test.log'),
|
||||||
log_name="Test Log",
|
log_name="Test Log",
|
||||||
log_settings={
|
log_settings={
|
||||||
# "log_level_console": 'DEBUG',
|
"log_level_console": 'DEBUG',
|
||||||
"log_level_console": None,
|
# "log_level_console": None,
|
||||||
"log_level_file": 'DEBUG',
|
"log_level_file": 'DEBUG',
|
||||||
# "console_color_output_enabled": False,
|
# "console_color_output_enabled": False,
|
||||||
|
"per_run_log": True
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
logn = Logger(log.get_logger_settings())
|
||||||
|
|
||||||
log.logger.debug('[NORMAL] Debug test: %s', log.logger.name)
|
log.logger.debug('[NORMAL] Debug test: %s', log.logger.name)
|
||||||
log.lg.debug('[NORMAL] Debug test: %s', log.logger.name)
|
log.lg.debug('[NORMAL] Debug test: %s', log.logger.name)
|
||||||
log.debug('[NORMAL-] Debug test: %s', log.logger.name)
|
log.debug('[NORMAL-] Debug test: %s', log.logger.name)
|
||||||
|
logn.lg.debug('[NORMAL N] Debug test: %s', log.logger.name)
|
||||||
|
logn.debug('[NORMAL N-] Debug test: %s', log.logger.name)
|
||||||
log.logger.info('[NORMAL] Info test: %s', log.logger.name)
|
log.logger.info('[NORMAL] Info test: %s', log.logger.name)
|
||||||
log.info('[NORMAL-] Info test: %s', log.logger.name)
|
log.info('[NORMAL-] Info test: %s', log.logger.name)
|
||||||
log.logger.warning('[NORMAL] Warning test: %s', log.logger.name)
|
log.logger.warning('[NORMAL] Warning test: %s', log.logger.name)
|
||||||
@@ -75,11 +81,16 @@ def main():
|
|||||||
__test = 5 / 0
|
__test = 5 / 0
|
||||||
print(f"Divied: {__test}")
|
print(f"Divied: {__test}")
|
||||||
except ZeroDivisionError as e:
|
except ZeroDivisionError as e:
|
||||||
|
print(f"** sys.exec_info(): {sys.exc_info()}")
|
||||||
|
print(f"** sys.exec_info(): [{exception_stack()}] | [{exception_stack(sys.exc_info())}] | [{call_stack()}]")
|
||||||
log.logger.critical("Divison through zero: %s", e)
|
log.logger.critical("Divison through zero: %s", e)
|
||||||
log.exception("Divison through zero")
|
log.exception("Divison through zero: %s", e)
|
||||||
|
|
||||||
for handler in log.logger.handlers:
|
for handler in log.logger.handlers:
|
||||||
print(f"Handler (logger) {handler} -> {handler.level} -> {LoggingLevel.from_any(handler.level)}")
|
print(
|
||||||
|
f"** Handler (logger) {handler} [{handler.name}] -> "
|
||||||
|
f"{handler.level} -> {LoggingLevel.from_any(handler.level)}"
|
||||||
|
)
|
||||||
|
|
||||||
for key, handler in log.handlers.items():
|
for key, handler in log.handlers.items():
|
||||||
print(f"Handler (handlers) [{key}] {handler} -> {handler.level} -> {LoggingLevel.from_any(handler.level)}")
|
print(f"Handler (handlers) [{key}] {handler} -> {handler.level} -> {LoggingLevel.from_any(handler.level)}")
|
||||||
|
|||||||
54
test-run/string_handling/timestamp_strings.py
Normal file
54
test-run/string_handling/timestamp_strings.py
Normal file
@@ -0,0 +1,54 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
"""
|
||||||
|
timestamp string checks
|
||||||
|
"""
|
||||||
|
|
||||||
|
from corelibs.string_handling.timestamp_strings import convert_to_seconds, TimeParseError, TimeUnitError
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
"""
|
||||||
|
Comment
|
||||||
|
"""
|
||||||
|
test_cases = [
|
||||||
|
"5M 6d", # 5 months, 6 days
|
||||||
|
"2h 30m 45s", # 2 hours, 30 minutes, 45 seconds
|
||||||
|
"1Y 2M 3d", # 1 year, 2 months, 3 days
|
||||||
|
"1h", # 1 hour
|
||||||
|
"30m", # 30 minutes
|
||||||
|
"2 hours 15 minutes", # 2 hours, 15 minutes
|
||||||
|
"1d 12h", # 1 day, 12 hours
|
||||||
|
"3M 2d 4h", # 3 months, 2 days, 4 hours
|
||||||
|
"45s", # 45 seconds
|
||||||
|
"1 year 2 months", # 1 year, 2 months
|
||||||
|
"2Y 6M 15d 8h 30m 45s", # Complex example
|
||||||
|
# ]
|
||||||
|
# invalid_test_cases = [
|
||||||
|
"5M 6d 2M", # months appears twice
|
||||||
|
"2h 30m 45s 1h", # hours appears twice
|
||||||
|
"1d 2 days", # days appears twice (short and long form)
|
||||||
|
"30m 45 minutes", # minutes appears twice
|
||||||
|
"1Y 2 years", # years appears twice
|
||||||
|
"1x 2 yrs", # invalid names
|
||||||
|
|
||||||
|
123, # int
|
||||||
|
789.12, # float
|
||||||
|
456.56, # float, high
|
||||||
|
"4566", # int as string
|
||||||
|
"5551.12", # float as string
|
||||||
|
"5551.56", # float, high as string
|
||||||
|
]
|
||||||
|
|
||||||
|
for time_string in test_cases:
|
||||||
|
try:
|
||||||
|
result = convert_to_seconds(time_string)
|
||||||
|
print(f"{time_string} => {result}")
|
||||||
|
except (TimeParseError, TimeUnitError) as e:
|
||||||
|
print(f"Error encountered for {time_string}: {type(e).__name__}: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
||||||
|
# __END__
|
||||||
291
tests/unit/iterator_handling/test_dict_helpers.py
Normal file
291
tests/unit/iterator_handling/test_dict_helpers.py
Normal file
@@ -0,0 +1,291 @@
|
|||||||
|
"""
|
||||||
|
tests for corelibs.iterator_handling.dict_helpers
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from typing import Any
|
||||||
|
from corelibs.iterator_handling.dict_helpers import mask
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_default_behavior():
|
||||||
|
"""Test masking with default mask_keys"""
|
||||||
|
data = {
|
||||||
|
"username": "john_doe",
|
||||||
|
"password": "secret123",
|
||||||
|
"email": "john@example.com",
|
||||||
|
"api_secret": "abc123",
|
||||||
|
"encryption_key": "xyz789"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data)
|
||||||
|
|
||||||
|
assert result["username"] == "john_doe"
|
||||||
|
assert result["password"] == "***"
|
||||||
|
assert result["email"] == "john@example.com"
|
||||||
|
assert result["api_secret"] == "***"
|
||||||
|
assert result["encryption_key"] == "***"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_custom_keys():
|
||||||
|
"""Test masking with custom mask_keys"""
|
||||||
|
data = {
|
||||||
|
"username": "john_doe",
|
||||||
|
"token": "abc123",
|
||||||
|
"api_key": "xyz789",
|
||||||
|
"password": "secret123"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data, mask_keys=["token", "api"])
|
||||||
|
|
||||||
|
assert result["username"] == "john_doe"
|
||||||
|
assert result["token"] == "***"
|
||||||
|
assert result["api_key"] == "***"
|
||||||
|
assert result["password"] == "secret123" # Not masked with custom keys
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_custom_mask_string():
|
||||||
|
"""Test masking with custom mask string"""
|
||||||
|
data = {"password": "secret123"}
|
||||||
|
|
||||||
|
result = mask(data, mask_str="[HIDDEN]")
|
||||||
|
|
||||||
|
assert result["password"] == "[HIDDEN]"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_case_insensitive():
|
||||||
|
"""Test that masking is case insensitive"""
|
||||||
|
data = {
|
||||||
|
"PASSWORD": "secret123",
|
||||||
|
"Secret_Key": "abc123",
|
||||||
|
"ENCRYPTION_data": "xyz789"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data)
|
||||||
|
|
||||||
|
assert result["PASSWORD"] == "***"
|
||||||
|
assert result["Secret_Key"] == "***"
|
||||||
|
assert result["ENCRYPTION_data"] == "***"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_key_patterns():
|
||||||
|
"""Test different key matching patterns (start, end, contains)"""
|
||||||
|
data = {
|
||||||
|
"password_hash": "hash123", # starts with
|
||||||
|
"user_password": "secret123", # ends with
|
||||||
|
"my_secret_key": "abc123", # contains with edges
|
||||||
|
"secretvalue": "xyz789", # contains without edges
|
||||||
|
"startsecretvalue": "xyz123", # contains without edges
|
||||||
|
"normal_key": "normal_value"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data)
|
||||||
|
|
||||||
|
assert result["password_hash"] == "***"
|
||||||
|
assert result["user_password"] == "***"
|
||||||
|
assert result["my_secret_key"] == "***"
|
||||||
|
assert result["secretvalue"] == "***" # will mask beacuse starts with
|
||||||
|
assert result["startsecretvalue"] == "xyz123" # will not mask
|
||||||
|
assert result["normal_key"] == "normal_value"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_custom_edges():
|
||||||
|
"""Test masking with custom edge characters"""
|
||||||
|
data = {
|
||||||
|
"my-secret-key": "abc123",
|
||||||
|
"my_secret_key": "xyz789"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data, mask_str_edges="-")
|
||||||
|
|
||||||
|
assert result["my-secret-key"] == "***"
|
||||||
|
assert result["my_secret_key"] == "xyz789" # Underscore edges don't match
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_empty_edges():
|
||||||
|
"""Test masking with empty edge characters (substring matching)"""
|
||||||
|
data = {
|
||||||
|
"secretvalue": "abc123",
|
||||||
|
"mysecretkey": "xyz789",
|
||||||
|
"normal_key": "normal_value"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data, mask_str_edges="")
|
||||||
|
|
||||||
|
assert result["secretvalue"] == "***"
|
||||||
|
assert result["mysecretkey"] == "***"
|
||||||
|
assert result["normal_key"] == "normal_value"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_nested_dict():
|
||||||
|
"""Test masking nested dictionaries"""
|
||||||
|
data = {
|
||||||
|
"user": {
|
||||||
|
"name": "john",
|
||||||
|
"password": "secret123",
|
||||||
|
"profile": {
|
||||||
|
"email": "john@example.com",
|
||||||
|
"encryption_key": "abc123"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"api_secret": "xyz789"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data)
|
||||||
|
|
||||||
|
assert result["user"]["name"] == "john"
|
||||||
|
assert result["user"]["password"] == "***"
|
||||||
|
assert result["user"]["profile"]["email"] == "john@example.com"
|
||||||
|
assert result["user"]["profile"]["encryption_key"] == "***"
|
||||||
|
assert result["api_secret"] == "***"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_lists():
|
||||||
|
"""Test masking lists and nested structures with lists"""
|
||||||
|
data = {
|
||||||
|
"users": [
|
||||||
|
{"name": "john", "password": "secret1"},
|
||||||
|
{"name": "jane", "password": "secret2"}
|
||||||
|
],
|
||||||
|
"secrets": ["secret1", "secret2", "secret3"]
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data)
|
||||||
|
print(f"R {result['secrets']}")
|
||||||
|
|
||||||
|
assert result["users"][0]["name"] == "john"
|
||||||
|
assert result["users"][0]["password"] == "***"
|
||||||
|
assert result["users"][1]["name"] == "jane"
|
||||||
|
assert result["users"][1]["password"] == "***"
|
||||||
|
assert result["secrets"] == ["***", "***", "***"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_mixed_types():
|
||||||
|
"""Test masking with different value types"""
|
||||||
|
data = {
|
||||||
|
"password": "string_value",
|
||||||
|
"secret_number": 12345,
|
||||||
|
"encryption_flag": True,
|
||||||
|
"secret_float": 3.14,
|
||||||
|
"password_none": None,
|
||||||
|
"normal_key": "normal_value"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data)
|
||||||
|
|
||||||
|
assert result["password"] == "***"
|
||||||
|
assert result["secret_number"] == "***"
|
||||||
|
assert result["encryption_flag"] == "***"
|
||||||
|
assert result["secret_float"] == "***"
|
||||||
|
assert result["password_none"] == "***"
|
||||||
|
assert result["normal_key"] == "normal_value"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_skip_true():
|
||||||
|
"""Test that skip=True returns original data unchanged"""
|
||||||
|
data = {
|
||||||
|
"password": "secret123",
|
||||||
|
"encryption_key": "abc123",
|
||||||
|
"normal_key": "normal_value"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data, skip=True)
|
||||||
|
|
||||||
|
assert result == data
|
||||||
|
assert result is data # Should return the same object
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_empty_dict():
|
||||||
|
"""Test masking empty dictionary"""
|
||||||
|
data: dict[str, Any] = {}
|
||||||
|
|
||||||
|
result = mask(data)
|
||||||
|
|
||||||
|
assert result == {}
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_none_mask_keys():
|
||||||
|
"""Test explicit None mask_keys uses defaults"""
|
||||||
|
data = {"password": "secret123", "token": "abc123"}
|
||||||
|
|
||||||
|
result = mask(data, mask_keys=None)
|
||||||
|
|
||||||
|
assert result["password"] == "***"
|
||||||
|
assert result["token"] == "abc123" # Not in default keys
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_empty_mask_keys():
|
||||||
|
"""Test empty mask_keys list"""
|
||||||
|
data = {"password": "secret123", "secret": "abc123"}
|
||||||
|
|
||||||
|
result = mask(data, mask_keys=[])
|
||||||
|
|
||||||
|
assert result["password"] == "secret123"
|
||||||
|
assert result["secret"] == "abc123"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_complex_nested_structure():
|
||||||
|
"""Test masking complex nested structure"""
|
||||||
|
data = {
|
||||||
|
"config": {
|
||||||
|
"database": {
|
||||||
|
"host": "localhost",
|
||||||
|
"password": "db_secret",
|
||||||
|
"users": [
|
||||||
|
{"name": "admin", "password": "admin123"},
|
||||||
|
{"name": "user", "secret_key": "user456"}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"api": {
|
||||||
|
"endpoints": ["api1", "api2"],
|
||||||
|
"encryption_settings": {
|
||||||
|
"enabled": True,
|
||||||
|
"secret": "api_secret"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data)
|
||||||
|
|
||||||
|
assert result["config"]["database"]["host"] == "localhost"
|
||||||
|
assert result["config"]["database"]["password"] == "***"
|
||||||
|
assert result["config"]["database"]["users"][0]["name"] == "admin"
|
||||||
|
assert result["config"]["database"]["users"][0]["password"] == "***"
|
||||||
|
assert result["config"]["database"]["users"][1]["name"] == "user"
|
||||||
|
assert result["config"]["database"]["users"][1]["secret_key"] == "***"
|
||||||
|
assert result["config"]["api"]["endpoints"] == ["api1", "api2"]
|
||||||
|
assert result["config"]["api"]["encryption_settings"]["enabled"] is True
|
||||||
|
assert result["config"]["api"]["encryption_settings"]["secret"] == "***"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_preserves_original_data():
|
||||||
|
"""Test that original data is not modified"""
|
||||||
|
original_data = {
|
||||||
|
"password": "secret123",
|
||||||
|
"username": "john_doe"
|
||||||
|
}
|
||||||
|
data_copy = original_data.copy()
|
||||||
|
|
||||||
|
result = mask(original_data)
|
||||||
|
|
||||||
|
assert original_data == data_copy # Original unchanged
|
||||||
|
assert result != original_data # Result is different
|
||||||
|
assert result["password"] == "***"
|
||||||
|
assert original_data["password"] == "secret123"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("mask_key,expected_keys", [
|
||||||
|
(["pass"], ["password", "user_pass", "my_pass_key"]),
|
||||||
|
(["key"], ["api_key", "secret_key", "my_key_value"]),
|
||||||
|
(["token"], ["token", "auth_token", "my_token_here"]),
|
||||||
|
])
|
||||||
|
def test_mask_parametrized_keys(mask_key: list[str], expected_keys: list[str]):
|
||||||
|
"""Parametrized test for different mask key patterns"""
|
||||||
|
data = {key: "value" for key in expected_keys}
|
||||||
|
data["normal_entry"] = "normal_value"
|
||||||
|
|
||||||
|
result = mask(data, mask_keys=mask_key)
|
||||||
|
|
||||||
|
for key in expected_keys:
|
||||||
|
assert result[key] == "***"
|
||||||
|
assert result["normal_entry"] == "normal_value"
|
||||||
4
uv.lock
generated
4
uv.lock
generated
@@ -1,5 +1,5 @@
|
|||||||
version = 1
|
version = 1
|
||||||
revision = 2
|
revision = 3
|
||||||
requires-python = ">=3.13"
|
requires-python = ">=3.13"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -44,7 +44,7 @@ wheels = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "corelibs"
|
name = "corelibs"
|
||||||
version = "0.12.6"
|
version = "0.22.2"
|
||||||
source = { editable = "." }
|
source = { editable = "." }
|
||||||
dependencies = [
|
dependencies = [
|
||||||
{ name = "jmespath" },
|
{ name = "jmespath" },
|
||||||
|
|||||||
Reference in New Issue
Block a user