Compare commits
18 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ea527ea60c | ||
|
|
fd5e1db22b | ||
|
|
39e23faf7f | ||
|
|
de285b531a | ||
|
|
0a29a592f9 | ||
|
|
e045b1d3b5 | ||
|
|
280e5fa861 | ||
|
|
472d3495b5 | ||
|
|
2778ac6870 | ||
|
|
743a0a8ac9 | ||
|
|
694712ed2e | ||
|
|
ea3b4f1790 | ||
|
|
da68818d4f | ||
|
|
db6a3b53c5 | ||
|
|
82b089498e | ||
|
|
948b0dd5e7 | ||
|
|
4acc0b51b1 | ||
|
|
a626b738a9 |
4
ToDo.md
4
ToDo.md
@@ -1,5 +1,5 @@
|
|||||||
# ToDo list
|
# ToDo list
|
||||||
|
|
||||||
- [ ] stub files .pyi
|
- [x] stub files .pyi
|
||||||
- [ ] Add tests for all, we need 100% test coverate
|
- [ ] Add tests for all, we need 100% test coverate
|
||||||
- [ ] Log: add custom format for "stack_correct" if set, this will override the normal stack block
|
- [x] Log: add custom format for "stack_correct" if set, this will override the normal stack block
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
# MARK: Project info
|
# MARK: Project info
|
||||||
[project]
|
[project]
|
||||||
name = "corelibs"
|
name = "corelibs"
|
||||||
version = "0.12.6"
|
version = "0.15.0"
|
||||||
description = "Collection of utils for Python scripts"
|
description = "Collection of utils for Python scripts"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
requires-python = ">=3.13"
|
requires-python = ">=3.13"
|
||||||
|
|||||||
@@ -5,10 +5,33 @@ List of regex compiled strings that can be used
|
|||||||
import re
|
import re
|
||||||
|
|
||||||
|
|
||||||
EMAIL_REGEX_BASIC = r"""
|
def compile_re(reg: str) -> re.Pattern[str]:
|
||||||
|
"""
|
||||||
|
compile a regex with verbose flag
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
reg {str} -- _description_
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
re.Pattern[str] -- _description_
|
||||||
|
"""
|
||||||
|
return re.compile(reg, re.VERBOSE)
|
||||||
|
|
||||||
|
|
||||||
|
# email regex
|
||||||
|
EMAIL_BASIC_REGEX: str = r"""
|
||||||
^[A-Za-z0-9!#$%&'*+\-\/=?^_`{|}~][A-Za-z0-9!#$%:\(\)&'*+\-\/=?^_`{|}~\.]{0,63}
|
^[A-Za-z0-9!#$%&'*+\-\/=?^_`{|}~][A-Za-z0-9!#$%:\(\)&'*+\-\/=?^_`{|}~\.]{0,63}
|
||||||
@(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[a-zA-Z]{2,6}$
|
@(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[a-zA-Z]{2,6}$
|
||||||
"""
|
"""
|
||||||
EMAIL_REGEX_BASIC_COMPILED = re.compile(EMAIL_REGEX_BASIC)
|
# Domain regex with localhost
|
||||||
|
DOMAIN_WITH_LOCALHOST_REGEX: str = r"""
|
||||||
|
^(?:localhost|(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[A-Za-z]{2,})$
|
||||||
|
"""
|
||||||
|
# domain regex with loclhost and optional port
|
||||||
|
DOMAIN_WITH_LOCALHOST_PORT_REGEX: str = r"""
|
||||||
|
^(?:localhost|(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[A-Za-z]{2,})(?::\d+)?$
|
||||||
|
"""
|
||||||
|
# Domain, no localhost
|
||||||
|
DOMAIN_REGEX: str = r"^(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[A-Za-z]{2,}$"
|
||||||
|
|
||||||
# __END__
|
# __END__
|
||||||
|
|||||||
@@ -258,6 +258,12 @@ class SettingsLoader:
|
|||||||
self.__build_from_to_equal(entry, check)
|
self.__build_from_to_equal(entry, check)
|
||||||
):
|
):
|
||||||
error = True
|
error = True
|
||||||
|
# after post clean up if we have empty entries and we are mandatory
|
||||||
|
if check == "mandatory:yes" and (
|
||||||
|
not settings[config_id].get(entry) or settings[config_id].get(entry) == ['']
|
||||||
|
):
|
||||||
|
error = True
|
||||||
|
self.__print(f"[!] Missing content entry for: {entry}", 'ERROR')
|
||||||
if error is True:
|
if error is True:
|
||||||
raise ValueError(self.__print("[!] Missing or incorrect settings data. Cannot proceed", 'CRITICAL'))
|
raise ValueError(self.__print("[!] Missing or incorrect settings data. Cannot proceed", 'CRITICAL'))
|
||||||
# set empty
|
# set empty
|
||||||
|
|||||||
@@ -3,7 +3,9 @@ Class of checks that can be run on value entries
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
from typing import TypedDict
|
from typing import TypedDict
|
||||||
from corelibs.check_handling.regex_constants import EMAIL_REGEX_BASIC
|
from corelibs.check_handling.regex_constants import (
|
||||||
|
EMAIL_BASIC_REGEX, DOMAIN_WITH_LOCALHOST_REGEX, DOMAIN_WITH_LOCALHOST_PORT_REGEX, DOMAIN_REGEX
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class SettingsLoaderCheckValue(TypedDict):
|
class SettingsLoaderCheckValue(TypedDict):
|
||||||
@@ -45,10 +47,34 @@ class SettingsLoaderCheck:
|
|||||||
},
|
},
|
||||||
# This does a baisc email check, only alphanumeric with special characters
|
# This does a baisc email check, only alphanumeric with special characters
|
||||||
"string.email.basic": {
|
"string.email.basic": {
|
||||||
"regex": EMAIL_REGEX_BASIC,
|
"regex": EMAIL_BASIC_REGEX,
|
||||||
"regex_clean": None,
|
"regex_clean": None,
|
||||||
"replace": "",
|
"replace": "",
|
||||||
},
|
},
|
||||||
|
# Domain check, including localhost no port
|
||||||
|
"string.domain.with-localhost": {
|
||||||
|
"regex": DOMAIN_WITH_LOCALHOST_REGEX,
|
||||||
|
"regex_clean": None,
|
||||||
|
"replace": "",
|
||||||
|
},
|
||||||
|
# Domain check, with localhost and port
|
||||||
|
"string.domain.with-localhost.port": {
|
||||||
|
"regex": DOMAIN_WITH_LOCALHOST_PORT_REGEX,
|
||||||
|
"regex_clean": None,
|
||||||
|
"replace": "",
|
||||||
|
},
|
||||||
|
# Domain check, no pure localhost allowed
|
||||||
|
"string.domain": {
|
||||||
|
"regex": DOMAIN_REGEX,
|
||||||
|
"regex_clean": None,
|
||||||
|
"replace": "",
|
||||||
|
},
|
||||||
|
# Basic date check, does not validate date itself
|
||||||
|
"string.date": {
|
||||||
|
"regex": r"^\d{4}[/-]\d{1,2}[/-]\d{1,2}$",
|
||||||
|
"regex_clean": None,
|
||||||
|
"replace": "",
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -3,26 +3,36 @@ Dict helpers
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
from typing import Any
|
from typing import TypeAlias, Union, Dict, List, Any, cast
|
||||||
|
|
||||||
|
# definitions for the mask run below
|
||||||
|
MaskableValue: TypeAlias = Union[str, int, float, bool, None]
|
||||||
|
NestedDict: TypeAlias = Dict[str, Union[MaskableValue, List[Any], 'NestedDict']]
|
||||||
|
ProcessableValue: TypeAlias = Union[MaskableValue, List[Any], NestedDict]
|
||||||
|
|
||||||
|
|
||||||
def mask(
|
def mask(
|
||||||
data_set: dict[str, str],
|
data_set: dict[str, Any],
|
||||||
mask_keys: list[str] | None = None,
|
mask_keys: list[str] | None = None,
|
||||||
mask_str: str = "***",
|
mask_str: str = "***",
|
||||||
|
mask_str_edges: str = '_',
|
||||||
skip: bool = False
|
skip: bool = False
|
||||||
) -> dict[str, str]:
|
) -> dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
mask data for output
|
mask data for output
|
||||||
Checks if mask_keys list exist in any key in the data set either from the start or at the end
|
Checks if mask_keys list exist in any key in the data set either from the start or at the end
|
||||||
|
|
||||||
|
Use the mask_str_edges to define how searches inside a string should work. Default it must start
|
||||||
|
and end with '_', remove to search string in string
|
||||||
|
|
||||||
Arguments:
|
Arguments:
|
||||||
data_set {dict[str, str]} -- _description_
|
data_set {dict[str, str]} -- _description_
|
||||||
|
|
||||||
Keyword Arguments:
|
Keyword Arguments:
|
||||||
mask_keys {list[str] | None} -- _description_ (default: {None})
|
mask_keys {list[str] | None} -- _description_ (default: {None})
|
||||||
mask_str {str} -- _description_ (default: {"***"})
|
mask_str {str} -- _description_ (default: {"***"})
|
||||||
skip {bool} -- _description_ (default: {False})
|
mask_str_edges {str} -- _description_ (default: {"_"})
|
||||||
|
skip {bool} -- if set to true skip (default: {False})
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
dict[str, str] -- _description_
|
dict[str, str] -- _description_
|
||||||
@@ -30,29 +40,46 @@ def mask(
|
|||||||
if skip is True:
|
if skip is True:
|
||||||
return data_set
|
return data_set
|
||||||
if mask_keys is None:
|
if mask_keys is None:
|
||||||
mask_keys = ["password", "secret"]
|
mask_keys = ["encryption", "password", "secret"]
|
||||||
|
else:
|
||||||
|
# make sure it is lower case
|
||||||
|
mask_keys = [mask_key.lower() for mask_key in mask_keys]
|
||||||
|
|
||||||
|
def should_mask_key(key: str) -> bool:
|
||||||
|
"""Check if a key should be masked"""
|
||||||
|
__key_lower = key.lower()
|
||||||
|
return any(
|
||||||
|
__key_lower.startswith(mask_key) or
|
||||||
|
__key_lower.endswith(mask_key) or
|
||||||
|
f"{mask_str_edges}{mask_key}{mask_str_edges}" in __key_lower
|
||||||
|
for mask_key in mask_keys
|
||||||
|
)
|
||||||
|
|
||||||
|
def mask_recursive(obj: ProcessableValue) -> ProcessableValue:
|
||||||
|
"""Recursively mask values in nested structures"""
|
||||||
|
if isinstance(obj, dict):
|
||||||
|
return {
|
||||||
|
key: mask_value(value) if should_mask_key(key) else mask_recursive(value)
|
||||||
|
for key, value in obj.items()
|
||||||
|
}
|
||||||
|
if isinstance(obj, list):
|
||||||
|
return [mask_recursive(item) for item in obj]
|
||||||
|
return obj
|
||||||
|
|
||||||
|
def mask_value(value: Any) -> Any:
|
||||||
|
"""Handle masking based on value type"""
|
||||||
|
if isinstance(value, list):
|
||||||
|
# Mask each individual value in the list
|
||||||
|
return [mask_str for _ in cast('list[Any]', value)]
|
||||||
|
if isinstance(value, dict):
|
||||||
|
# Recursively process the dictionary instead of masking the whole thing
|
||||||
|
return mask_recursive(cast('ProcessableValue', value))
|
||||||
|
# Mask primitive values
|
||||||
|
return mask_str
|
||||||
|
|
||||||
return {
|
return {
|
||||||
key: mask_str
|
key: mask_value(value) if should_mask_key(key) else mask_recursive(value)
|
||||||
if any(key.startswith(mask_key) or key.endswith(mask_key) for mask_key in mask_keys) else value
|
|
||||||
for key, value in data_set.items()
|
for key, value in data_set.items()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def set_entry(dict_set: dict[str, Any], key: str, value_set: Any) -> dict[str, Any]:
|
|
||||||
"""
|
|
||||||
set a new entry in the dict set
|
|
||||||
|
|
||||||
Arguments:
|
|
||||||
key {str} -- _description_
|
|
||||||
dict_set {dict[str, Any]} -- _description_
|
|
||||||
value_set {Any} -- _description_
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
dict[str, Any] -- _description_
|
|
||||||
"""
|
|
||||||
if not dict_set.get(key):
|
|
||||||
dict_set[key] = {}
|
|
||||||
dict_set[key] = value_set
|
|
||||||
return dict_set
|
|
||||||
|
|
||||||
# __END__
|
# __END__
|
||||||
|
|||||||
@@ -20,12 +20,7 @@ if TYPE_CHECKING:
|
|||||||
|
|
||||||
# MARK: Log settings TypedDict
|
# MARK: Log settings TypedDict
|
||||||
class LogSettings(TypedDict):
|
class LogSettings(TypedDict):
|
||||||
"""
|
"""log settings, for Log setup"""
|
||||||
log settings
|
|
||||||
|
|
||||||
Arguments:
|
|
||||||
TypedDict {_type_} -- _description_
|
|
||||||
"""
|
|
||||||
log_level_console: LoggingLevel
|
log_level_console: LoggingLevel
|
||||||
log_level_file: LoggingLevel
|
log_level_file: LoggingLevel
|
||||||
console_enabled: bool
|
console_enabled: bool
|
||||||
@@ -35,6 +30,12 @@ class LogSettings(TypedDict):
|
|||||||
log_queue: 'Queue[str] | None'
|
log_queue: 'Queue[str] | None'
|
||||||
|
|
||||||
|
|
||||||
|
class LoggerInit(TypedDict):
|
||||||
|
"""for Logger init"""
|
||||||
|
logger: logging.Logger
|
||||||
|
log_queue: 'Queue[str] | None'
|
||||||
|
|
||||||
|
|
||||||
# MARK: Custom color filter
|
# MARK: Custom color filter
|
||||||
class CustomConsoleFormatter(logging.Formatter):
|
class CustomConsoleFormatter(logging.Formatter):
|
||||||
"""
|
"""
|
||||||
@@ -73,299 +74,49 @@ class CustomConsoleFormatter(logging.Formatter):
|
|||||||
message = super().format(record)
|
message = super().format(record)
|
||||||
return f"{color}{message}{reset}"
|
return f"{color}{message}{reset}"
|
||||||
|
|
||||||
|
|
||||||
# TODO: add custom handlers for stack_trace, if not set fill with %(filename)s:%(funcName)s:%(lineno)d
|
# TODO: add custom handlers for stack_trace, if not set fill with %(filename)s:%(funcName)s:%(lineno)d
|
||||||
# hasattr(record, 'stack_trace')
|
# hasattr(record, 'stack_trace')
|
||||||
|
|
||||||
|
|
||||||
# MARK: Log class
|
class CustomHandlerFilter(logging.Filter):
|
||||||
class Log:
|
|
||||||
"""
|
"""
|
||||||
logger setup
|
Add a custom handler for filtering
|
||||||
|
"""
|
||||||
|
HANDLER_NAME_FILTER_EXCEPTION: str = 'console'
|
||||||
|
|
||||||
|
def __init__(self, handler_name: str, filter_exceptions: bool = False):
|
||||||
|
super().__init__(name=handler_name)
|
||||||
|
self.handler_name = handler_name
|
||||||
|
self.filter_exceptions = filter_exceptions
|
||||||
|
|
||||||
|
def filter(self, record: logging.LogRecord) -> bool:
|
||||||
|
# if console and exception do not show
|
||||||
|
if self.handler_name == self.HANDLER_NAME_FILTER_EXCEPTION and self.filter_exceptions:
|
||||||
|
return record.levelname != "EXCEPTION"
|
||||||
|
# if cnosole entry is true and traget file filter
|
||||||
|
if hasattr(record, 'console') and getattr(record, 'console') is True and self.handler_name == 'file':
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
# def __filter_exceptions(self, record: logging.LogRecord) -> bool:
|
||||||
|
# return record.levelname != "EXCEPTION"
|
||||||
|
|
||||||
|
|
||||||
|
# MARK: Parent class
|
||||||
|
class LogParent:
|
||||||
|
"""
|
||||||
|
Parent class with general methods
|
||||||
|
used by Log and Logger
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# spacer lenght characters and the character
|
# spacer lenght characters and the character
|
||||||
SPACER_CHAR: str = '='
|
SPACER_CHAR: str = '='
|
||||||
SPACER_LENGTH: int = 32
|
SPACER_LENGTH: int = 32
|
||||||
# default logging level
|
|
||||||
DEFAULT_LOG_LEVEL: LoggingLevel = LoggingLevel.WARNING
|
|
||||||
DEFAULT_LOG_LEVEL_FILE: LoggingLevel = LoggingLevel.DEBUG
|
|
||||||
DEFAULT_LOG_LEVEL_CONSOLE: LoggingLevel = LoggingLevel.WARNING
|
|
||||||
# default settings
|
|
||||||
DEFAULT_LOG_SETTINGS: LogSettings = {
|
|
||||||
"log_level_console": LoggingLevel.WARNING,
|
|
||||||
"log_level_file": LoggingLevel.DEBUG,
|
|
||||||
"console_enabled": True,
|
|
||||||
"console_color_output_enabled": True,
|
|
||||||
"add_start_info": True,
|
|
||||||
"add_end_info": False,
|
|
||||||
"log_queue": None,
|
|
||||||
}
|
|
||||||
|
|
||||||
# MARK: constructor
|
def __init__(self):
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
log_path: Path,
|
|
||||||
log_name: str,
|
|
||||||
log_settings: dict[str, 'LoggingLevel | str | bool | None | Queue[str]'] | LogSettings | None = None,
|
|
||||||
other_handlers: dict[str, Any] | None = None
|
|
||||||
):
|
|
||||||
# add new level for alert, emergecny and exception
|
|
||||||
logging.addLevelName(LoggingLevel.ALERT.value, LoggingLevel.ALERT.name)
|
|
||||||
logging.addLevelName(LoggingLevel.EMERGENCY.value, LoggingLevel.EMERGENCY.name)
|
|
||||||
logging.addLevelName(LoggingLevel.EXCEPTION.value, LoggingLevel.EXCEPTION.name)
|
|
||||||
# parse the logging settings
|
|
||||||
self.log_settings = self.__parse_log_settings(log_settings)
|
|
||||||
# if path, set log name with .log
|
|
||||||
# if log name with .log, strip .log for naming
|
|
||||||
if log_path.is_dir():
|
|
||||||
__log_file_name = re.sub(r'[^a-zA-Z0-9]', '', log_name)
|
|
||||||
if not log_name.endswith('.log'):
|
|
||||||
log_path = log_path.joinpath(Path(__log_file_name).with_suffix('.log'))
|
|
||||||
else:
|
|
||||||
log_path = log_path.joinpath(__log_file_name)
|
|
||||||
elif not log_path.suffix == '.log':
|
|
||||||
# add .log if the path is a file but without .log
|
|
||||||
log_path = log_path.with_suffix('.log')
|
|
||||||
# stip .log from the log name if set
|
|
||||||
if not log_name.endswith('.log'):
|
|
||||||
log_name = Path(log_name).stem
|
|
||||||
# general log name
|
|
||||||
self.log_name = log_name
|
|
||||||
|
|
||||||
self.log_queue: 'Queue[str] | None' = None
|
|
||||||
self.listener: logging.handlers.QueueListener | None = None
|
|
||||||
self.logger: logging.Logger
|
self.logger: logging.Logger
|
||||||
|
self.log_queue: 'Queue[str] | None' = None
|
||||||
# setup handlers
|
|
||||||
# NOTE if console with color is set first, some of the color formatting is set
|
|
||||||
# in the file writer too, for the ones where color is set BEFORE the format
|
|
||||||
# Any is logging.StreamHandler, logging.FileHandler and all logging.handlers.*
|
|
||||||
self.handlers: dict[str, Any] = {}
|
self.handlers: dict[str, Any] = {}
|
||||||
self.add_handler('file_handler', self.__create_time_rotating_file_handler(
|
|
||||||
self.log_settings['log_level_file'], log_path)
|
|
||||||
)
|
|
||||||
if self.log_settings['console_enabled']:
|
|
||||||
# console
|
|
||||||
self.add_handler('stream_handler', self.__create_console_handler(
|
|
||||||
self.log_settings['log_level_console'])
|
|
||||||
)
|
|
||||||
# add other handlers,
|
|
||||||
if other_handlers is not None:
|
|
||||||
for handler_key, handler in other_handlers.items():
|
|
||||||
self.add_handler(handler_key, handler)
|
|
||||||
# init listener if we have a log_queue set
|
|
||||||
self.__init_listener(self.log_settings['log_queue'])
|
|
||||||
|
|
||||||
# overall logger start
|
|
||||||
self.__init_log(log_name)
|
|
||||||
# if requests set a start log
|
|
||||||
if self.log_settings['add_start_info'] is True:
|
|
||||||
self.break_line('START')
|
|
||||||
|
|
||||||
# MARK: deconstructor
|
|
||||||
def __del__(self):
|
|
||||||
"""
|
|
||||||
Call when class is destroyed, make sure the listender is closed or else we throw a thread error
|
|
||||||
"""
|
|
||||||
if self.log_settings['add_end_info']:
|
|
||||||
self.break_line('END')
|
|
||||||
self.stop_listener()
|
|
||||||
|
|
||||||
# MARK: parse log settings
|
|
||||||
def __parse_log_settings(
|
|
||||||
self,
|
|
||||||
log_settings: dict[str, 'LoggingLevel | str | bool | None | Queue[str]'] | LogSettings | None
|
|
||||||
) -> LogSettings:
|
|
||||||
# skip with defaul it not set
|
|
||||||
if log_settings is None:
|
|
||||||
return self.DEFAULT_LOG_SETTINGS
|
|
||||||
# check entries
|
|
||||||
default_log_settings = self.DEFAULT_LOG_SETTINGS
|
|
||||||
# check log levels
|
|
||||||
for __log_entry in ['log_level_console', 'log_level_file']:
|
|
||||||
if log_settings.get(__log_entry) is None:
|
|
||||||
continue
|
|
||||||
# if not valid reset to default, if not in default set to WARNING
|
|
||||||
if not self.validate_log_level(__log_level := log_settings.get(__log_entry, '')):
|
|
||||||
__log_level = self.DEFAULT_LOG_SETTINGS.get(
|
|
||||||
__log_entry, self.DEFAULT_LOG_LEVEL
|
|
||||||
)
|
|
||||||
default_log_settings[__log_entry] = LoggingLevel.from_any(__log_level)
|
|
||||||
# check bool
|
|
||||||
for __log_entry in [
|
|
||||||
"console_enabled",
|
|
||||||
"console_color_output_enabled",
|
|
||||||
"add_start_info",
|
|
||||||
"add_end_info",
|
|
||||||
]:
|
|
||||||
if log_settings.get(__log_entry) is None:
|
|
||||||
continue
|
|
||||||
if not isinstance(__setting := log_settings.get(__log_entry, ''), bool):
|
|
||||||
__setting = self.DEFAULT_LOG_SETTINGS.get(__log_entry, True)
|
|
||||||
default_log_settings[__log_entry] = __setting
|
|
||||||
# check log queue
|
|
||||||
__setting = log_settings.get('log_queue', self.DEFAULT_LOG_SETTINGS['log_queue'])
|
|
||||||
if __setting is not None:
|
|
||||||
__setting = cast('Queue[str]', __setting)
|
|
||||||
default_log_settings['log_queue'] = __setting
|
|
||||||
return default_log_settings
|
|
||||||
|
|
||||||
def __filter_exceptions(self, record: logging.LogRecord) -> bool:
|
|
||||||
return record.levelname != "EXCEPTION"
|
|
||||||
|
|
||||||
# MARK: add a handler
|
|
||||||
def add_handler(
|
|
||||||
self,
|
|
||||||
handler_name: str,
|
|
||||||
handler: Any
|
|
||||||
) -> bool:
|
|
||||||
"""
|
|
||||||
Add a log handler to the handlers dict
|
|
||||||
|
|
||||||
Arguments:
|
|
||||||
handler_name {str} -- _description_
|
|
||||||
handler {Any} -- _description_
|
|
||||||
"""
|
|
||||||
if self.handlers.get(handler_name):
|
|
||||||
return False
|
|
||||||
if self.listener is not None or hasattr(self, 'logger'):
|
|
||||||
raise ValueError(
|
|
||||||
f"Cannot add handler {handler_name}: {handler.get_name()} because logger is already running"
|
|
||||||
)
|
|
||||||
# TODO: handler must be some handler type, how to check?
|
|
||||||
self.handlers[handler_name] = handler
|
|
||||||
return True
|
|
||||||
|
|
||||||
# MARK: console handler
|
|
||||||
def __create_console_handler(
|
|
||||||
self, log_level_console: LoggingLevel = LoggingLevel.WARNING, filter_exceptions: bool = True
|
|
||||||
) -> logging.StreamHandler[TextIO]:
|
|
||||||
# console logger
|
|
||||||
if not self.validate_log_level(log_level_console):
|
|
||||||
log_level_console = self.DEFAULT_LOG_LEVEL_CONSOLE
|
|
||||||
console_handler = logging.StreamHandler()
|
|
||||||
# format layouts
|
|
||||||
format_string = (
|
|
||||||
'[%(asctime)s.%(msecs)03d] '
|
|
||||||
'[%(name)s] '
|
|
||||||
'[%(filename)s:%(funcName)s:%(lineno)d] '
|
|
||||||
'<%(levelname)s> '
|
|
||||||
'%(message)s'
|
|
||||||
)
|
|
||||||
format_date = "%Y-%m-%d %H:%M:%S"
|
|
||||||
# color or not
|
|
||||||
if self.log_settings['console_color_output_enabled']:
|
|
||||||
formatter_console = CustomConsoleFormatter(format_string, datefmt=format_date)
|
|
||||||
else:
|
|
||||||
formatter_console = logging.Formatter(format_string, datefmt=format_date)
|
|
||||||
console_handler.setLevel(log_level_console.name)
|
|
||||||
console_handler.set_name('console')
|
|
||||||
# do not show exceptions logs on console
|
|
||||||
if filter_exceptions:
|
|
||||||
console_handler.addFilter(self.__filter_exceptions)
|
|
||||||
console_handler.setFormatter(formatter_console)
|
|
||||||
return console_handler
|
|
||||||
|
|
||||||
# MARK: file handler
|
|
||||||
def __create_time_rotating_file_handler(
|
|
||||||
self, log_level_file: LoggingLevel, log_path: Path,
|
|
||||||
when: str = "D", interval: int = 1, backup_count: int = 0
|
|
||||||
) -> logging.handlers.TimedRotatingFileHandler:
|
|
||||||
# file logger
|
|
||||||
# when: S/M/H/D/W0-W6/midnight
|
|
||||||
# interval: how many, 1D = every day
|
|
||||||
# backup_count: how many old to keep, 0 = all
|
|
||||||
if not self.validate_log_level(log_level_file):
|
|
||||||
log_level_file = self.DEFAULT_LOG_LEVEL_FILE
|
|
||||||
file_handler = logging.handlers.TimedRotatingFileHandler(
|
|
||||||
filename=log_path,
|
|
||||||
encoding="utf-8",
|
|
||||||
when=when,
|
|
||||||
interval=interval,
|
|
||||||
backupCount=backup_count
|
|
||||||
)
|
|
||||||
formatter_file_handler = logging.Formatter(
|
|
||||||
(
|
|
||||||
# time stamp
|
|
||||||
'[%(asctime)s.%(msecs)03d] '
|
|
||||||
# log name
|
|
||||||
'[%(name)s] '
|
|
||||||
# filename + pid
|
|
||||||
'[%(filename)s:%(process)d] '
|
|
||||||
# path + func + line number
|
|
||||||
'[%(pathname)s:%(funcName)s:%(lineno)d] '
|
|
||||||
# error level
|
|
||||||
'<%(levelname)s> '
|
|
||||||
# message
|
|
||||||
'%(message)s'
|
|
||||||
),
|
|
||||||
datefmt="%Y-%m-%dT%H:%M:%S",
|
|
||||||
)
|
|
||||||
file_handler.set_name('file_timed_rotate')
|
|
||||||
file_handler.setLevel(log_level_file.name)
|
|
||||||
file_handler.setFormatter(formatter_file_handler)
|
|
||||||
return file_handler
|
|
||||||
|
|
||||||
# MARK: init listener
|
|
||||||
def __init_listener(self, log_queue: 'Queue[str] | None' = None):
|
|
||||||
"""
|
|
||||||
If we have a Queue option start the logging queue
|
|
||||||
|
|
||||||
Keyword Arguments:
|
|
||||||
log_queue {Queue[str] | None} -- _description_ (default: {None})
|
|
||||||
"""
|
|
||||||
if log_queue is None:
|
|
||||||
return
|
|
||||||
self.log_queue = log_queue
|
|
||||||
self.listener = logging.handlers.QueueListener(
|
|
||||||
self.log_queue,
|
|
||||||
*self.handlers.values(),
|
|
||||||
respect_handler_level=True
|
|
||||||
)
|
|
||||||
self.listener.start()
|
|
||||||
|
|
||||||
# MARK: init main log
|
|
||||||
def __init_log(self, log_name: str) -> None:
|
|
||||||
"""
|
|
||||||
Initialize the main loggger
|
|
||||||
"""
|
|
||||||
queue_handler: logging.handlers.QueueHandler | None = None
|
|
||||||
if self.log_queue is not None:
|
|
||||||
queue_handler = logging.handlers.QueueHandler(self.log_queue)
|
|
||||||
# overall logger settings
|
|
||||||
self.logger = logging.getLogger(log_name)
|
|
||||||
# add all the handlers
|
|
||||||
if queue_handler is None:
|
|
||||||
for handler in self.handlers.values():
|
|
||||||
self.logger.addHandler(handler)
|
|
||||||
else:
|
|
||||||
self.logger.addHandler(queue_handler)
|
|
||||||
# set maximum logging level for all logging output
|
|
||||||
# log level filtering is done per handler
|
|
||||||
self.logger.setLevel(logging.DEBUG)
|
|
||||||
# short name
|
|
||||||
self.lg = self.logger
|
|
||||||
self.l = self.logger
|
|
||||||
|
|
||||||
# MARK: init logger for Fork/Thread
|
|
||||||
@staticmethod
|
|
||||||
def init_worker_logging(log_queue: 'Queue[str]') -> logging.Logger:
|
|
||||||
"""
|
|
||||||
This initalizes a logger that can be used in pool/thread queue calls
|
|
||||||
"""
|
|
||||||
queue_handler = logging.handlers.QueueHandler(log_queue)
|
|
||||||
# getLogger call MUST be WITHOUT and logger name
|
|
||||||
root_logger = logging.getLogger()
|
|
||||||
# base logging level, filtering is done in the handlers
|
|
||||||
root_logger.setLevel(logging.DEBUG)
|
|
||||||
root_logger.handlers.clear()
|
|
||||||
root_logger.addHandler(queue_handler)
|
|
||||||
|
|
||||||
# for debug only
|
|
||||||
root_logger.debug('[LOGGER] Init log: %s - %s', log_queue, root_logger.handlers)
|
|
||||||
|
|
||||||
return root_logger
|
|
||||||
|
|
||||||
# FIXME: we need to add a custom formater to add stack level listing if we want to
|
# FIXME: we need to add a custom formater to add stack level listing if we want to
|
||||||
# Important note, although they exist, it is recommended to use self.logger.NAME directly
|
# Important note, although they exist, it is recommended to use self.logger.NAME directly
|
||||||
@@ -454,7 +205,11 @@ class Log:
|
|||||||
self.logger.log(LoggingLevel.EMERGENCY.value, msg, *args, extra=extra, stacklevel=2)
|
self.logger.log(LoggingLevel.EMERGENCY.value, msg, *args, extra=extra, stacklevel=2)
|
||||||
|
|
||||||
# MARK: EXCEPTION: 70
|
# MARK: EXCEPTION: 70
|
||||||
def exception(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
|
def exception(
|
||||||
|
self,
|
||||||
|
msg: object, *args: object, extra: MutableMapping[str, object] | None = None,
|
||||||
|
log_error: bool = True
|
||||||
|
) -> None:
|
||||||
"""
|
"""
|
||||||
log on exceotion level, this is log.exception, but logs with a new level
|
log on exceotion level, this is log.exception, but logs with a new level
|
||||||
|
|
||||||
@@ -462,15 +217,21 @@ class Log:
|
|||||||
msg (object): _description_
|
msg (object): _description_
|
||||||
*args (object): arguments for msg
|
*args (object): arguments for msg
|
||||||
extra: Mapping[str, object] | None: extra arguments for the formatting if needed
|
extra: Mapping[str, object] | None: extra arguments for the formatting if needed
|
||||||
|
log_error: (bool): If set to false will not write additional error message for console (Default True)
|
||||||
"""
|
"""
|
||||||
if not hasattr(self, 'logger'):
|
if not hasattr(self, 'logger'):
|
||||||
raise ValueError('Logger is not yet initialized')
|
raise ValueError('Logger is not yet initialized')
|
||||||
if extra is None:
|
if extra is None:
|
||||||
extra = {}
|
extra = {}
|
||||||
extra['stack_trace'] = traceback_call_str(start=3)
|
extra['stack_trace'] = traceback_call_str(start=3)
|
||||||
|
# write to console first with extra flag for filtering in file
|
||||||
|
if log_error:
|
||||||
|
self.logger.log(
|
||||||
|
LoggingLevel.ERROR.value,
|
||||||
|
f"<=EXCEPTION> {msg}", *args, extra=dict(extra) | {'console': True}, stacklevel=2
|
||||||
|
)
|
||||||
self.logger.log(LoggingLevel.EXCEPTION.value, msg, *args, exc_info=True, extra=extra, stacklevel=2)
|
self.logger.log(LoggingLevel.EXCEPTION.value, msg, *args, exc_info=True, extra=extra, stacklevel=2)
|
||||||
|
|
||||||
# MARK: break line
|
|
||||||
def break_line(self, info: str = "BREAK"):
|
def break_line(self, info: str = "BREAK"):
|
||||||
"""
|
"""
|
||||||
add a break line as info level
|
add a break line as info level
|
||||||
@@ -494,7 +255,7 @@ class Log:
|
|||||||
Returns:
|
Returns:
|
||||||
bool -- _description_
|
bool -- _description_
|
||||||
"""
|
"""
|
||||||
if not self.listener or not self.log_queue:
|
if not self.log_queue:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -516,14 +277,6 @@ class Log:
|
|||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def stop_listener(self):
|
|
||||||
"""
|
|
||||||
stop the listener
|
|
||||||
"""
|
|
||||||
if self.listener is not None:
|
|
||||||
self.flush()
|
|
||||||
self.listener.stop()
|
|
||||||
|
|
||||||
# MARK: log level handling
|
# MARK: log level handling
|
||||||
def set_log_level(self, handler_name: str, log_level: LoggingLevel) -> bool:
|
def set_log_level(self, handler_name: str, log_level: LoggingLevel) -> bool:
|
||||||
"""
|
"""
|
||||||
@@ -564,7 +317,7 @@ class Log:
|
|||||||
LoggingLevel -- _description_
|
LoggingLevel -- _description_
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
return self.handlers[handler_name]
|
return LoggingLevel.from_any(self.handlers[handler_name].level)
|
||||||
except IndexError:
|
except IndexError:
|
||||||
return LoggingLevel.NOTSET
|
return LoggingLevel.NOTSET
|
||||||
|
|
||||||
@@ -602,4 +355,334 @@ class Log:
|
|||||||
except ValueError:
|
except ValueError:
|
||||||
return LoggingLevel.from_string(Log.DEFAULT_LOG_LEVEL.name).value
|
return LoggingLevel.from_string(Log.DEFAULT_LOG_LEVEL.name).value
|
||||||
|
|
||||||
|
|
||||||
|
# MARK: Logger
|
||||||
|
class Logger(LogParent):
|
||||||
|
"""
|
||||||
|
The class we can pass on to other clases without re-init the class itself
|
||||||
|
NOTE: if no queue object is handled over the logging level change might not take immediate effect
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, logger_settings: LoggerInit):
|
||||||
|
LogParent.__init__(self)
|
||||||
|
self.logger = logger_settings['logger']
|
||||||
|
self.lg = self.logger
|
||||||
|
self.l = self.logger
|
||||||
|
self.handlers = {str(_handler.name): _handler for _handler in self.logger.handlers}
|
||||||
|
self.log_queue = logger_settings['log_queue']
|
||||||
|
|
||||||
|
|
||||||
|
# MARK: LogSetup class
|
||||||
|
class Log(LogParent):
|
||||||
|
"""
|
||||||
|
logger setup
|
||||||
|
"""
|
||||||
|
|
||||||
|
# spacer lenght characters and the character
|
||||||
|
SPACER_CHAR: str = '='
|
||||||
|
SPACER_LENGTH: int = 32
|
||||||
|
# default logging level
|
||||||
|
DEFAULT_LOG_LEVEL: LoggingLevel = LoggingLevel.WARNING
|
||||||
|
DEFAULT_LOG_LEVEL_FILE: LoggingLevel = LoggingLevel.DEBUG
|
||||||
|
DEFAULT_LOG_LEVEL_CONSOLE: LoggingLevel = LoggingLevel.WARNING
|
||||||
|
# default settings
|
||||||
|
DEFAULT_LOG_SETTINGS: LogSettings = {
|
||||||
|
"log_level_console": DEFAULT_LOG_LEVEL_CONSOLE,
|
||||||
|
"log_level_file": DEFAULT_LOG_LEVEL_FILE,
|
||||||
|
"console_enabled": True,
|
||||||
|
"console_color_output_enabled": True,
|
||||||
|
"add_start_info": True,
|
||||||
|
"add_end_info": False,
|
||||||
|
"log_queue": None,
|
||||||
|
}
|
||||||
|
|
||||||
|
# MARK: constructor
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
log_path: Path,
|
||||||
|
log_name: str,
|
||||||
|
log_settings: dict[str, 'LoggingLevel | str | bool | None | Queue[str]'] | LogSettings | None = None,
|
||||||
|
other_handlers: dict[str, Any] | None = None
|
||||||
|
):
|
||||||
|
LogParent.__init__(self)
|
||||||
|
# add new level for alert, emergecny and exception
|
||||||
|
logging.addLevelName(LoggingLevel.ALERT.value, LoggingLevel.ALERT.name)
|
||||||
|
logging.addLevelName(LoggingLevel.EMERGENCY.value, LoggingLevel.EMERGENCY.name)
|
||||||
|
logging.addLevelName(LoggingLevel.EXCEPTION.value, LoggingLevel.EXCEPTION.name)
|
||||||
|
# parse the logging settings
|
||||||
|
self.log_settings = self.__parse_log_settings(log_settings)
|
||||||
|
# if path, set log name with .log
|
||||||
|
# if log name with .log, strip .log for naming
|
||||||
|
if log_path.is_dir():
|
||||||
|
__log_file_name = re.sub(r'[^a-zA-Z0-9]', '', log_name)
|
||||||
|
if not log_name.endswith('.log'):
|
||||||
|
log_path = log_path.joinpath(Path(__log_file_name).with_suffix('.log'))
|
||||||
|
else:
|
||||||
|
log_path = log_path.joinpath(__log_file_name)
|
||||||
|
elif not log_path.suffix == '.log':
|
||||||
|
# add .log if the path is a file but without .log
|
||||||
|
log_path = log_path.with_suffix('.log')
|
||||||
|
# stip .log from the log name if set
|
||||||
|
if not log_name.endswith('.log'):
|
||||||
|
log_name = Path(log_name).stem
|
||||||
|
# general log name
|
||||||
|
self.log_name = log_name
|
||||||
|
|
||||||
|
self.log_queue: 'Queue[str] | None' = None
|
||||||
|
self.listener: logging.handlers.QueueListener | None = None
|
||||||
|
self.logger: logging.Logger
|
||||||
|
|
||||||
|
# setup handlers
|
||||||
|
# NOTE if console with color is set first, some of the color formatting is set
|
||||||
|
# in the file writer too, for the ones where color is set BEFORE the format
|
||||||
|
# Any is logging.StreamHandler, logging.FileHandler and all logging.handlers.*
|
||||||
|
self.handlers: dict[str, Any] = {}
|
||||||
|
self.add_handler('file_handler', self.__create_timed_rotating_file_handler(
|
||||||
|
'file_handler', self.log_settings['log_level_file'], log_path)
|
||||||
|
)
|
||||||
|
if self.log_settings['console_enabled']:
|
||||||
|
# console
|
||||||
|
self.add_handler('stream_handler', self.__create_console_handler(
|
||||||
|
'stream_handler', self.log_settings['log_level_console'])
|
||||||
|
)
|
||||||
|
# add other handlers,
|
||||||
|
if other_handlers is not None:
|
||||||
|
for handler_key, handler in other_handlers.items():
|
||||||
|
self.add_handler(handler_key, handler)
|
||||||
|
# init listener if we have a log_queue set
|
||||||
|
self.__init_listener(self.log_settings['log_queue'])
|
||||||
|
|
||||||
|
# overall logger start
|
||||||
|
self.__init_log(log_name)
|
||||||
|
# if requests set a start log
|
||||||
|
if self.log_settings['add_start_info'] is True:
|
||||||
|
self.break_line('START')
|
||||||
|
|
||||||
|
# MARK: deconstructor
|
||||||
|
def __del__(self):
|
||||||
|
"""
|
||||||
|
Call when class is destroyed, make sure the listender is closed or else we throw a thread error
|
||||||
|
"""
|
||||||
|
if self.log_settings['add_end_info']:
|
||||||
|
self.break_line('END')
|
||||||
|
self.stop_listener()
|
||||||
|
|
||||||
|
# MARK: parse log settings
|
||||||
|
def __parse_log_settings(
|
||||||
|
self,
|
||||||
|
log_settings: dict[str, 'LoggingLevel | str | bool | None | Queue[str]'] | LogSettings | None
|
||||||
|
) -> LogSettings:
|
||||||
|
# skip with defaul it not set
|
||||||
|
if log_settings is None:
|
||||||
|
return self.DEFAULT_LOG_SETTINGS
|
||||||
|
# check entries
|
||||||
|
default_log_settings = self.DEFAULT_LOG_SETTINGS
|
||||||
|
# check log levels
|
||||||
|
for __log_entry in ['log_level_console', 'log_level_file']:
|
||||||
|
if log_settings.get(__log_entry) is None:
|
||||||
|
continue
|
||||||
|
# if not valid reset to default, if not in default set to WARNING
|
||||||
|
if not self.validate_log_level(__log_level := log_settings.get(__log_entry, '')):
|
||||||
|
__log_level = self.DEFAULT_LOG_SETTINGS.get(
|
||||||
|
__log_entry, self.DEFAULT_LOG_LEVEL
|
||||||
|
)
|
||||||
|
default_log_settings[__log_entry] = LoggingLevel.from_any(__log_level)
|
||||||
|
# check bool
|
||||||
|
for __log_entry in [
|
||||||
|
"console_enabled",
|
||||||
|
"console_color_output_enabled",
|
||||||
|
"add_start_info",
|
||||||
|
"add_end_info",
|
||||||
|
]:
|
||||||
|
if log_settings.get(__log_entry) is None:
|
||||||
|
continue
|
||||||
|
if not isinstance(__setting := log_settings.get(__log_entry, ''), bool):
|
||||||
|
__setting = self.DEFAULT_LOG_SETTINGS.get(__log_entry, True)
|
||||||
|
default_log_settings[__log_entry] = __setting
|
||||||
|
# check log queue
|
||||||
|
__setting = log_settings.get('log_queue', self.DEFAULT_LOG_SETTINGS['log_queue'])
|
||||||
|
if __setting is not None:
|
||||||
|
__setting = cast('Queue[str]', __setting)
|
||||||
|
default_log_settings['log_queue'] = __setting
|
||||||
|
return default_log_settings
|
||||||
|
|
||||||
|
# def __filter_exceptions(self, record: logging.LogRecord) -> bool:
|
||||||
|
# return record.levelname != "EXCEPTION"
|
||||||
|
|
||||||
|
# MARK: add a handler
|
||||||
|
def add_handler(
|
||||||
|
self,
|
||||||
|
handler_name: str,
|
||||||
|
handler: Any
|
||||||
|
) -> bool:
|
||||||
|
"""
|
||||||
|
Add a log handler to the handlers dict
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
handler_name {str} -- _description_
|
||||||
|
handler {Any} -- _description_
|
||||||
|
"""
|
||||||
|
if self.handlers.get(handler_name):
|
||||||
|
return False
|
||||||
|
if self.listener is not None or hasattr(self, 'logger'):
|
||||||
|
raise ValueError(
|
||||||
|
f"Cannot add handler {handler_name}: {handler.get_name()} because logger is already running"
|
||||||
|
)
|
||||||
|
# TODO: handler must be some handler type, how to check?
|
||||||
|
self.handlers[handler_name] = handler
|
||||||
|
return True
|
||||||
|
|
||||||
|
# MARK: console handler
|
||||||
|
def __create_console_handler(
|
||||||
|
self, handler_name: str,
|
||||||
|
log_level_console: LoggingLevel = LoggingLevel.WARNING, filter_exceptions: bool = True
|
||||||
|
) -> logging.StreamHandler[TextIO]:
|
||||||
|
# console logger
|
||||||
|
if not self.validate_log_level(log_level_console):
|
||||||
|
log_level_console = self.DEFAULT_LOG_LEVEL_CONSOLE
|
||||||
|
console_handler = logging.StreamHandler()
|
||||||
|
# format layouts
|
||||||
|
format_string = (
|
||||||
|
'[%(asctime)s.%(msecs)03d] '
|
||||||
|
'[%(name)s] '
|
||||||
|
'[%(filename)s:%(funcName)s:%(lineno)d] '
|
||||||
|
'<%(levelname)s> '
|
||||||
|
'%(message)s'
|
||||||
|
)
|
||||||
|
format_date = "%Y-%m-%d %H:%M:%S"
|
||||||
|
# color or not
|
||||||
|
if self.log_settings['console_color_output_enabled']:
|
||||||
|
formatter_console = CustomConsoleFormatter(format_string, datefmt=format_date)
|
||||||
|
else:
|
||||||
|
formatter_console = logging.Formatter(format_string, datefmt=format_date)
|
||||||
|
console_handler.set_name(handler_name)
|
||||||
|
console_handler.setLevel(log_level_console.name)
|
||||||
|
# do not show exceptions logs on console
|
||||||
|
console_handler.addFilter(CustomHandlerFilter('console', filter_exceptions))
|
||||||
|
console_handler.setFormatter(formatter_console)
|
||||||
|
return console_handler
|
||||||
|
|
||||||
|
# MARK: file handler
|
||||||
|
def __create_timed_rotating_file_handler(
|
||||||
|
self, handler_name: str,
|
||||||
|
log_level_file: LoggingLevel, log_path: Path,
|
||||||
|
when: str = "D", interval: int = 1, backup_count: int = 0
|
||||||
|
) -> logging.handlers.TimedRotatingFileHandler:
|
||||||
|
# file logger
|
||||||
|
# when: S/M/H/D/W0-W6/midnight
|
||||||
|
# interval: how many, 1D = every day
|
||||||
|
# backup_count: how many old to keep, 0 = all
|
||||||
|
if not self.validate_log_level(log_level_file):
|
||||||
|
log_level_file = self.DEFAULT_LOG_LEVEL_FILE
|
||||||
|
file_handler = logging.handlers.TimedRotatingFileHandler(
|
||||||
|
filename=log_path,
|
||||||
|
encoding="utf-8",
|
||||||
|
when=when,
|
||||||
|
interval=interval,
|
||||||
|
backupCount=backup_count
|
||||||
|
)
|
||||||
|
formatter_file_handler = logging.Formatter(
|
||||||
|
(
|
||||||
|
# time stamp
|
||||||
|
'[%(asctime)s.%(msecs)03d] '
|
||||||
|
# log name
|
||||||
|
'[%(name)s] '
|
||||||
|
# filename + pid
|
||||||
|
'[%(filename)s:%(process)d] '
|
||||||
|
# path + func + line number
|
||||||
|
'[%(pathname)s:%(funcName)s:%(lineno)d] '
|
||||||
|
# error level
|
||||||
|
'<%(levelname)s> '
|
||||||
|
# message
|
||||||
|
'%(message)s'
|
||||||
|
),
|
||||||
|
datefmt="%Y-%m-%dT%H:%M:%S",
|
||||||
|
)
|
||||||
|
file_handler.set_name(handler_name)
|
||||||
|
file_handler.setLevel(log_level_file.name)
|
||||||
|
# do not show errors flagged with console (they are from exceptions)
|
||||||
|
file_handler.addFilter(CustomHandlerFilter('file'))
|
||||||
|
file_handler.setFormatter(formatter_file_handler)
|
||||||
|
return file_handler
|
||||||
|
|
||||||
|
# MARK: init listener
|
||||||
|
def __init_listener(self, log_queue: 'Queue[str] | None' = None):
|
||||||
|
"""
|
||||||
|
If we have a Queue option start the logging queue
|
||||||
|
|
||||||
|
Keyword Arguments:
|
||||||
|
log_queue {Queue[str] | None} -- _description_ (default: {None})
|
||||||
|
"""
|
||||||
|
if log_queue is None:
|
||||||
|
return
|
||||||
|
self.log_queue = log_queue
|
||||||
|
self.listener = logging.handlers.QueueListener(
|
||||||
|
self.log_queue,
|
||||||
|
*self.handlers.values(),
|
||||||
|
respect_handler_level=True
|
||||||
|
)
|
||||||
|
self.listener.start()
|
||||||
|
|
||||||
|
def stop_listener(self):
|
||||||
|
"""
|
||||||
|
stop the listener
|
||||||
|
"""
|
||||||
|
if self.listener is not None:
|
||||||
|
self.flush()
|
||||||
|
self.listener.stop()
|
||||||
|
|
||||||
|
# MARK: init main log
|
||||||
|
def __init_log(self, log_name: str) -> None:
|
||||||
|
"""
|
||||||
|
Initialize the main loggger
|
||||||
|
"""
|
||||||
|
queue_handler: logging.handlers.QueueHandler | None = None
|
||||||
|
if self.log_queue is not None:
|
||||||
|
queue_handler = logging.handlers.QueueHandler(self.log_queue)
|
||||||
|
# overall logger settings
|
||||||
|
self.logger = logging.getLogger(log_name)
|
||||||
|
# add all the handlers
|
||||||
|
if queue_handler is None:
|
||||||
|
for handler in self.handlers.values():
|
||||||
|
self.logger.addHandler(handler)
|
||||||
|
else:
|
||||||
|
self.logger.addHandler(queue_handler)
|
||||||
|
# set maximum logging level for all logging output
|
||||||
|
# log level filtering is done per handler
|
||||||
|
self.logger.setLevel(logging.DEBUG)
|
||||||
|
# short name
|
||||||
|
self.lg = self.logger
|
||||||
|
self.l = self.logger
|
||||||
|
|
||||||
|
# MARK: init logger for Fork/Thread
|
||||||
|
@staticmethod
|
||||||
|
def init_worker_logging(log_queue: 'Queue[str]') -> logging.Logger:
|
||||||
|
"""
|
||||||
|
This initalizes a logger that can be used in pool/thread queue calls
|
||||||
|
"""
|
||||||
|
queue_handler = logging.handlers.QueueHandler(log_queue)
|
||||||
|
# getLogger call MUST be WITHOUT and logger name
|
||||||
|
root_logger = logging.getLogger()
|
||||||
|
# base logging level, filtering is done in the handlers
|
||||||
|
root_logger.setLevel(logging.DEBUG)
|
||||||
|
root_logger.handlers.clear()
|
||||||
|
root_logger.addHandler(queue_handler)
|
||||||
|
|
||||||
|
# for debug only
|
||||||
|
root_logger.debug('[LOGGER] Init log: %s - %s', log_queue, root_logger.handlers)
|
||||||
|
|
||||||
|
return root_logger
|
||||||
|
|
||||||
|
def get_logger_settings(self) -> LoggerInit:
|
||||||
|
"""
|
||||||
|
get the logger settings we need to init the Logger class
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
LoggerInit -- _description_
|
||||||
|
"""
|
||||||
|
return {
|
||||||
|
"logger": self.logger,
|
||||||
|
"log_queue": self.log_queue
|
||||||
|
}
|
||||||
|
|
||||||
# __END__
|
# __END__
|
||||||
|
|||||||
@@ -2,10 +2,12 @@
|
|||||||
Settings loader test
|
Settings loader test
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import re
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from corelibs.iterator_handling.dump_data import dump_data
|
from corelibs.debug_handling.dump_data import dump_data
|
||||||
from corelibs.logging_handling.log import Log
|
from corelibs.logging_handling.log import Log
|
||||||
from corelibs.config_handling.settings_loader import SettingsLoader
|
from corelibs.config_handling.settings_loader import SettingsLoader
|
||||||
|
from corelibs.config_handling.settings_loader_handling.settings_loader_check import SettingsLoaderCheck
|
||||||
|
|
||||||
SCRIPT_PATH: Path = Path(__file__).resolve().parent
|
SCRIPT_PATH: Path = Path(__file__).resolve().parent
|
||||||
ROOT_PATH: Path = SCRIPT_PATH
|
ROOT_PATH: Path = SCRIPT_PATH
|
||||||
@@ -18,6 +20,11 @@ def main():
|
|||||||
Main run
|
Main run
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
value = "2025/1/1"
|
||||||
|
regex_c = re.compile(SettingsLoaderCheck.CHECK_SETTINGS['string.date']['regex'], re.VERBOSE)
|
||||||
|
result = regex_c.search(value)
|
||||||
|
print(f"regex {regex_c} check against {value} -> {result}")
|
||||||
|
|
||||||
# for log testing
|
# for log testing
|
||||||
script_path: Path = Path(__file__).resolve().parent
|
script_path: Path = Path(__file__).resolve().parent
|
||||||
log = Log(
|
log = Log(
|
||||||
|
|||||||
106
test-run/iterator_handling/dict_helpers.py
Normal file
106
test-run/iterator_handling/dict_helpers.py
Normal file
@@ -0,0 +1,106 @@
|
|||||||
|
"""
|
||||||
|
Iterator helper testing
|
||||||
|
"""
|
||||||
|
|
||||||
|
from corelibs.debug_handling.dump_data import dump_data
|
||||||
|
from corelibs.iterator_handling.dict_helpers import mask
|
||||||
|
|
||||||
|
|
||||||
|
def __mask():
|
||||||
|
data = {
|
||||||
|
# "user": "john",
|
||||||
|
# "encryption_key": "Secret key",
|
||||||
|
# "ENCRYPTION.TEST": "Secret key test",
|
||||||
|
# "inside_password_test": "Hide this",
|
||||||
|
"password": ["secret1", "secret2"], # List value gets masked
|
||||||
|
# "config": {
|
||||||
|
# "db_password": {"primary": "secret", "backup": "secret2"}, # Dict value gets masked
|
||||||
|
# "api_keys": ["key1", "key2", "key3"] # List value gets masked
|
||||||
|
# },
|
||||||
|
# "items": [ # List value that doesn't get masked, but gets processed recursively
|
||||||
|
# {"name": "item1", "secret_key": "itemsecret"},
|
||||||
|
# {"name": "item2", "passwords": ["pass1", "pass2"]}
|
||||||
|
# ],
|
||||||
|
# "normal_list": ["item1", "item2", "item3"] # Normal list, not masked
|
||||||
|
}
|
||||||
|
data = {
|
||||||
|
"config": {
|
||||||
|
# "password": ["secret1", "secret2"],
|
||||||
|
# "password_other": {"password": ["secret1", "secret2"]},
|
||||||
|
# "database": {
|
||||||
|
# "host": "localhost",
|
||||||
|
# "password": "db_secret",
|
||||||
|
# "users": [
|
||||||
|
# {"name": "admin", "password": "admin123"},
|
||||||
|
# {"name": "user", "secret_key": "user456"}
|
||||||
|
# ]
|
||||||
|
# },
|
||||||
|
# "api": {
|
||||||
|
# # "endpoints": ["api1", "api2"],
|
||||||
|
# "encryption_settings": {
|
||||||
|
# "enabled": True,
|
||||||
|
# "secret": "api_secret"
|
||||||
|
# }
|
||||||
|
# }
|
||||||
|
"secret_key": "normal_value",
|
||||||
|
"api_key": "normal_value",
|
||||||
|
"my_key_value": "normal_value",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
data = {
|
||||||
|
"basic": {
|
||||||
|
"log_level_console": "DEBUG",
|
||||||
|
"log_level_file": "DEBUG",
|
||||||
|
"storage_interface": "sqlite",
|
||||||
|
"content_start_date": "2023-1-1",
|
||||||
|
"encryption_key": "ENCRYPTION_KEY"
|
||||||
|
},
|
||||||
|
"email": {
|
||||||
|
"alert_email": [
|
||||||
|
"test+z-sd@tequila.jp"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"poller": {
|
||||||
|
"max_forks": "1",
|
||||||
|
"interface": "Zac"
|
||||||
|
},
|
||||||
|
"pusher": {
|
||||||
|
"max_forks": "3",
|
||||||
|
"interface": "Screendragon"
|
||||||
|
},
|
||||||
|
"api:Zac": {
|
||||||
|
"type": "zac",
|
||||||
|
"client_id": "oro_zac_demo",
|
||||||
|
"client_secret": "CLIENT_SECRET",
|
||||||
|
"username": "zacuser",
|
||||||
|
"password": "ZACuser3",
|
||||||
|
"hostname": "e-gra2.zac.ai",
|
||||||
|
"appname": "e-gra2_api_trial",
|
||||||
|
"api_path": "b/api/v2"
|
||||||
|
},
|
||||||
|
"api:Screendragon": {
|
||||||
|
"type": "screendragon",
|
||||||
|
"client_id": "omniprostaging",
|
||||||
|
"encryption_client": "SOME_SECRET",
|
||||||
|
"client_encryption": "SOME_SECRET",
|
||||||
|
"secret_client": "SOME_SECRET",
|
||||||
|
"client_secret": "SOME_SECRET",
|
||||||
|
"hostname": "omniprostaging.screendragon.com",
|
||||||
|
"appname": "sdapi",
|
||||||
|
"api_path": "api"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result = mask(data)
|
||||||
|
print(f"** In: {dump_data(data)}")
|
||||||
|
print(f"===> Masked: {dump_data(result)}")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""
|
||||||
|
Test: corelibs.string_handling.string_helpers
|
||||||
|
"""
|
||||||
|
__mask()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -5,7 +5,7 @@ Log logging_handling.log testing
|
|||||||
# import atexit
|
# import atexit
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
# this is for testing only
|
# this is for testing only
|
||||||
from corelibs.logging_handling.log import Log
|
from corelibs.logging_handling.log import Log, Logger
|
||||||
from corelibs.logging_handling.logging_level_handling.logging_level import LoggingLevel
|
from corelibs.logging_handling.logging_level_handling.logging_level import LoggingLevel
|
||||||
|
|
||||||
|
|
||||||
@@ -18,16 +18,19 @@ def main():
|
|||||||
log_path=script_path.joinpath('log', 'test.log'),
|
log_path=script_path.joinpath('log', 'test.log'),
|
||||||
log_name="Test Log",
|
log_name="Test Log",
|
||||||
log_settings={
|
log_settings={
|
||||||
# "log_level_console": 'DEBUG',
|
"log_level_console": 'DEBUG',
|
||||||
"log_level_console": None,
|
# "log_level_console": None,
|
||||||
"log_level_file": 'DEBUG',
|
"log_level_file": 'DEBUG',
|
||||||
# "console_color_output_enabled": False,
|
# "console_color_output_enabled": False,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
logn = Logger(log.get_logger_settings())
|
||||||
|
|
||||||
log.logger.debug('[NORMAL] Debug test: %s', log.logger.name)
|
log.logger.debug('[NORMAL] Debug test: %s', log.logger.name)
|
||||||
log.lg.debug('[NORMAL] Debug test: %s', log.logger.name)
|
log.lg.debug('[NORMAL] Debug test: %s', log.logger.name)
|
||||||
log.debug('[NORMAL-] Debug test: %s', log.logger.name)
|
log.debug('[NORMAL-] Debug test: %s', log.logger.name)
|
||||||
|
logn.lg.debug('[NORMAL N] Debug test: %s', log.logger.name)
|
||||||
|
logn.debug('[NORMAL N-] Debug test: %s', log.logger.name)
|
||||||
log.logger.info('[NORMAL] Info test: %s', log.logger.name)
|
log.logger.info('[NORMAL] Info test: %s', log.logger.name)
|
||||||
log.info('[NORMAL-] Info test: %s', log.logger.name)
|
log.info('[NORMAL-] Info test: %s', log.logger.name)
|
||||||
log.logger.warning('[NORMAL] Warning test: %s', log.logger.name)
|
log.logger.warning('[NORMAL] Warning test: %s', log.logger.name)
|
||||||
@@ -76,10 +79,13 @@ def main():
|
|||||||
print(f"Divied: {__test}")
|
print(f"Divied: {__test}")
|
||||||
except ZeroDivisionError as e:
|
except ZeroDivisionError as e:
|
||||||
log.logger.critical("Divison through zero: %s", e)
|
log.logger.critical("Divison through zero: %s", e)
|
||||||
log.exception("Divison through zero")
|
log.exception("Divison through zero: %s", e)
|
||||||
|
|
||||||
for handler in log.logger.handlers:
|
for handler in log.logger.handlers:
|
||||||
print(f"Handler (logger) {handler} -> {handler.level} -> {LoggingLevel.from_any(handler.level)}")
|
print(
|
||||||
|
f"** Handler (logger) {handler} [{handler.name}] -> "
|
||||||
|
f"{handler.level} -> {LoggingLevel.from_any(handler.level)}"
|
||||||
|
)
|
||||||
|
|
||||||
for key, handler in log.handlers.items():
|
for key, handler in log.handlers.items():
|
||||||
print(f"Handler (handlers) [{key}] {handler} -> {handler.level} -> {LoggingLevel.from_any(handler.level)}")
|
print(f"Handler (handlers) [{key}] {handler} -> {handler.level} -> {LoggingLevel.from_any(handler.level)}")
|
||||||
|
|||||||
291
tests/unit/iterator_handling/test_dict_helpers.py
Normal file
291
tests/unit/iterator_handling/test_dict_helpers.py
Normal file
@@ -0,0 +1,291 @@
|
|||||||
|
"""
|
||||||
|
tests for corelibs.iterator_handling.dict_helpers
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from typing import Any
|
||||||
|
from corelibs.iterator_handling.dict_helpers import mask
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_default_behavior():
|
||||||
|
"""Test masking with default mask_keys"""
|
||||||
|
data = {
|
||||||
|
"username": "john_doe",
|
||||||
|
"password": "secret123",
|
||||||
|
"email": "john@example.com",
|
||||||
|
"api_secret": "abc123",
|
||||||
|
"encryption_key": "xyz789"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data)
|
||||||
|
|
||||||
|
assert result["username"] == "john_doe"
|
||||||
|
assert result["password"] == "***"
|
||||||
|
assert result["email"] == "john@example.com"
|
||||||
|
assert result["api_secret"] == "***"
|
||||||
|
assert result["encryption_key"] == "***"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_custom_keys():
|
||||||
|
"""Test masking with custom mask_keys"""
|
||||||
|
data = {
|
||||||
|
"username": "john_doe",
|
||||||
|
"token": "abc123",
|
||||||
|
"api_key": "xyz789",
|
||||||
|
"password": "secret123"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data, mask_keys=["token", "api"])
|
||||||
|
|
||||||
|
assert result["username"] == "john_doe"
|
||||||
|
assert result["token"] == "***"
|
||||||
|
assert result["api_key"] == "***"
|
||||||
|
assert result["password"] == "secret123" # Not masked with custom keys
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_custom_mask_string():
|
||||||
|
"""Test masking with custom mask string"""
|
||||||
|
data = {"password": "secret123"}
|
||||||
|
|
||||||
|
result = mask(data, mask_str="[HIDDEN]")
|
||||||
|
|
||||||
|
assert result["password"] == "[HIDDEN]"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_case_insensitive():
|
||||||
|
"""Test that masking is case insensitive"""
|
||||||
|
data = {
|
||||||
|
"PASSWORD": "secret123",
|
||||||
|
"Secret_Key": "abc123",
|
||||||
|
"ENCRYPTION_data": "xyz789"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data)
|
||||||
|
|
||||||
|
assert result["PASSWORD"] == "***"
|
||||||
|
assert result["Secret_Key"] == "***"
|
||||||
|
assert result["ENCRYPTION_data"] == "***"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_key_patterns():
|
||||||
|
"""Test different key matching patterns (start, end, contains)"""
|
||||||
|
data = {
|
||||||
|
"password_hash": "hash123", # starts with
|
||||||
|
"user_password": "secret123", # ends with
|
||||||
|
"my_secret_key": "abc123", # contains with edges
|
||||||
|
"secretvalue": "xyz789", # contains without edges
|
||||||
|
"startsecretvalue": "xyz123", # contains without edges
|
||||||
|
"normal_key": "normal_value"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data)
|
||||||
|
|
||||||
|
assert result["password_hash"] == "***"
|
||||||
|
assert result["user_password"] == "***"
|
||||||
|
assert result["my_secret_key"] == "***"
|
||||||
|
assert result["secretvalue"] == "***" # will mask beacuse starts with
|
||||||
|
assert result["startsecretvalue"] == "xyz123" # will not mask
|
||||||
|
assert result["normal_key"] == "normal_value"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_custom_edges():
|
||||||
|
"""Test masking with custom edge characters"""
|
||||||
|
data = {
|
||||||
|
"my-secret-key": "abc123",
|
||||||
|
"my_secret_key": "xyz789"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data, mask_str_edges="-")
|
||||||
|
|
||||||
|
assert result["my-secret-key"] == "***"
|
||||||
|
assert result["my_secret_key"] == "xyz789" # Underscore edges don't match
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_empty_edges():
|
||||||
|
"""Test masking with empty edge characters (substring matching)"""
|
||||||
|
data = {
|
||||||
|
"secretvalue": "abc123",
|
||||||
|
"mysecretkey": "xyz789",
|
||||||
|
"normal_key": "normal_value"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data, mask_str_edges="")
|
||||||
|
|
||||||
|
assert result["secretvalue"] == "***"
|
||||||
|
assert result["mysecretkey"] == "***"
|
||||||
|
assert result["normal_key"] == "normal_value"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_nested_dict():
|
||||||
|
"""Test masking nested dictionaries"""
|
||||||
|
data = {
|
||||||
|
"user": {
|
||||||
|
"name": "john",
|
||||||
|
"password": "secret123",
|
||||||
|
"profile": {
|
||||||
|
"email": "john@example.com",
|
||||||
|
"encryption_key": "abc123"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"api_secret": "xyz789"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data)
|
||||||
|
|
||||||
|
assert result["user"]["name"] == "john"
|
||||||
|
assert result["user"]["password"] == "***"
|
||||||
|
assert result["user"]["profile"]["email"] == "john@example.com"
|
||||||
|
assert result["user"]["profile"]["encryption_key"] == "***"
|
||||||
|
assert result["api_secret"] == "***"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_lists():
|
||||||
|
"""Test masking lists and nested structures with lists"""
|
||||||
|
data = {
|
||||||
|
"users": [
|
||||||
|
{"name": "john", "password": "secret1"},
|
||||||
|
{"name": "jane", "password": "secret2"}
|
||||||
|
],
|
||||||
|
"secrets": ["secret1", "secret2", "secret3"]
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data)
|
||||||
|
print(f"R {result['secrets']}")
|
||||||
|
|
||||||
|
assert result["users"][0]["name"] == "john"
|
||||||
|
assert result["users"][0]["password"] == "***"
|
||||||
|
assert result["users"][1]["name"] == "jane"
|
||||||
|
assert result["users"][1]["password"] == "***"
|
||||||
|
assert result["secrets"] == ["***", "***", "***"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_mixed_types():
|
||||||
|
"""Test masking with different value types"""
|
||||||
|
data = {
|
||||||
|
"password": "string_value",
|
||||||
|
"secret_number": 12345,
|
||||||
|
"encryption_flag": True,
|
||||||
|
"secret_float": 3.14,
|
||||||
|
"password_none": None,
|
||||||
|
"normal_key": "normal_value"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data)
|
||||||
|
|
||||||
|
assert result["password"] == "***"
|
||||||
|
assert result["secret_number"] == "***"
|
||||||
|
assert result["encryption_flag"] == "***"
|
||||||
|
assert result["secret_float"] == "***"
|
||||||
|
assert result["password_none"] == "***"
|
||||||
|
assert result["normal_key"] == "normal_value"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_skip_true():
|
||||||
|
"""Test that skip=True returns original data unchanged"""
|
||||||
|
data = {
|
||||||
|
"password": "secret123",
|
||||||
|
"encryption_key": "abc123",
|
||||||
|
"normal_key": "normal_value"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data, skip=True)
|
||||||
|
|
||||||
|
assert result == data
|
||||||
|
assert result is data # Should return the same object
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_empty_dict():
|
||||||
|
"""Test masking empty dictionary"""
|
||||||
|
data: dict[str, Any] = {}
|
||||||
|
|
||||||
|
result = mask(data)
|
||||||
|
|
||||||
|
assert result == {}
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_none_mask_keys():
|
||||||
|
"""Test explicit None mask_keys uses defaults"""
|
||||||
|
data = {"password": "secret123", "token": "abc123"}
|
||||||
|
|
||||||
|
result = mask(data, mask_keys=None)
|
||||||
|
|
||||||
|
assert result["password"] == "***"
|
||||||
|
assert result["token"] == "abc123" # Not in default keys
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_empty_mask_keys():
|
||||||
|
"""Test empty mask_keys list"""
|
||||||
|
data = {"password": "secret123", "secret": "abc123"}
|
||||||
|
|
||||||
|
result = mask(data, mask_keys=[])
|
||||||
|
|
||||||
|
assert result["password"] == "secret123"
|
||||||
|
assert result["secret"] == "abc123"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_complex_nested_structure():
|
||||||
|
"""Test masking complex nested structure"""
|
||||||
|
data = {
|
||||||
|
"config": {
|
||||||
|
"database": {
|
||||||
|
"host": "localhost",
|
||||||
|
"password": "db_secret",
|
||||||
|
"users": [
|
||||||
|
{"name": "admin", "password": "admin123"},
|
||||||
|
{"name": "user", "secret_key": "user456"}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"api": {
|
||||||
|
"endpoints": ["api1", "api2"],
|
||||||
|
"encryption_settings": {
|
||||||
|
"enabled": True,
|
||||||
|
"secret": "api_secret"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result = mask(data)
|
||||||
|
|
||||||
|
assert result["config"]["database"]["host"] == "localhost"
|
||||||
|
assert result["config"]["database"]["password"] == "***"
|
||||||
|
assert result["config"]["database"]["users"][0]["name"] == "admin"
|
||||||
|
assert result["config"]["database"]["users"][0]["password"] == "***"
|
||||||
|
assert result["config"]["database"]["users"][1]["name"] == "user"
|
||||||
|
assert result["config"]["database"]["users"][1]["secret_key"] == "***"
|
||||||
|
assert result["config"]["api"]["endpoints"] == ["api1", "api2"]
|
||||||
|
assert result["config"]["api"]["encryption_settings"]["enabled"] is True
|
||||||
|
assert result["config"]["api"]["encryption_settings"]["secret"] == "***"
|
||||||
|
|
||||||
|
|
||||||
|
def test_mask_preserves_original_data():
|
||||||
|
"""Test that original data is not modified"""
|
||||||
|
original_data = {
|
||||||
|
"password": "secret123",
|
||||||
|
"username": "john_doe"
|
||||||
|
}
|
||||||
|
data_copy = original_data.copy()
|
||||||
|
|
||||||
|
result = mask(original_data)
|
||||||
|
|
||||||
|
assert original_data == data_copy # Original unchanged
|
||||||
|
assert result != original_data # Result is different
|
||||||
|
assert result["password"] == "***"
|
||||||
|
assert original_data["password"] == "secret123"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("mask_key,expected_keys", [
|
||||||
|
(["pass"], ["password", "user_pass", "my_pass_key"]),
|
||||||
|
(["key"], ["api_key", "secret_key", "my_key_value"]),
|
||||||
|
(["token"], ["token", "auth_token", "my_token_here"]),
|
||||||
|
])
|
||||||
|
def test_mask_parametrized_keys(mask_key: list[str], expected_keys: list[str]):
|
||||||
|
"""Parametrized test for different mask key patterns"""
|
||||||
|
data = {key: "value" for key in expected_keys}
|
||||||
|
data["normal_entry"] = "normal_value"
|
||||||
|
|
||||||
|
result = mask(data, mask_keys=mask_key)
|
||||||
|
|
||||||
|
for key in expected_keys:
|
||||||
|
assert result[key] == "***"
|
||||||
|
assert result["normal_entry"] == "normal_value"
|
||||||
Reference in New Issue
Block a user