Compare commits

...

19 Commits

Author SHA1 Message Date
Clemens Schwaighofer
2b8240c156 v0.18.1: bug fix for find_in_array_from_list search key check 2025-07-25 15:58:59 +09:00
Clemens Schwaighofer
abf4b7ac89 Bug fix for find_in_array_from_list because of keys order 2025-07-25 15:57:48 +09:00
Clemens Schwaighofer
9c49f83c16 v0.18.0: array_search deprecation in change for find_in_array_from_list with correct parameter order 2025-07-25 15:50:58 +09:00
Clemens Schwaighofer
3a625ed0ee Merge branch 'master' into development 2025-07-25 15:49:58 +09:00
Clemens Schwaighofer
2cfbf4bb90 Update data search for iterators
array_search name is deprecated
use find_in_array_from_list
- change parameter order
data (search in) comes before search_params list
- created a TypedDict for the array search params dict entry
2025-07-25 15:48:37 +09:00
Clemens Schwaighofer
5767533668 v0.17.0: exceptions handling added for csv file reading 2025-07-25 10:25:44 +09:00
Clemens Schwaighofer
24798f19ca Add CSV Exceptions 2025-07-25 10:23:52 +09:00
Clemens Schwaighofer
26f8249187 v0.16.0: trackeback call stack reader method fix 2025-07-24 10:53:44 +09:00
Clemens Schwaighofer
dcefa564da Fix stack stack traceback call
It now works correct with start and skip_last settings, the method is now called "call_stack"

Also added auto reset if no output (start too hight) and optional stack separator
2025-07-24 10:52:39 +09:00
Clemens Schwaighofer
edd35dccea Comment update for log class 2025-07-22 19:32:21 +09:00
Clemens Schwaighofer
ea527ea60c v0.15.0: Log class update with split up class with Logger sub class for pool/fork/thread worker setup 2025-07-18 14:18:19 +09:00
Clemens Schwaighofer
fd5e1db22b Change Log class and add simple Logger class without the init work
This Logger class can be used after the main Log class has been setup, eg in workers
inside pool/fork/thread runs

A new parent class holds all the public methods

init Logger class with "get_logger_settings" or with a dictionary Type LoggerInit that has
logger with type logging.Logger as mandatory and optional Queue entry
2025-07-18 14:15:00 +09:00
Clemens Schwaighofer
39e23faf7f dict mask helper test code update 2025-07-17 15:29:49 +09:00
Clemens Schwaighofer
de285b531a ToDo list update 2025-07-17 15:22:12 +09:00
Clemens Schwaighofer
0a29a592f9 v0.14.1: mask update to also work recusrive 2025-07-17 15:20:38 +09:00
Clemens Schwaighofer
e045b1d3b5 Add docstring for pytest file dict helpers 2025-07-17 15:19:05 +09:00
Clemens Schwaighofer
280e5fa861 Update the mask dict helper
It now goes recursive, is case insenstivie for keys and mask keys requests
Checks not for equal but for start/end or inside with edge character set

pytests added
2025-07-17 15:17:57 +09:00
Clemens Schwaighofer
472d3495b5 Add missing typedefs for regex constants 2025-07-17 13:32:35 +09:00
Clemens Schwaighofer
2778ac6870 in Log use the defined default levels for console and file 2025-07-16 11:06:38 +09:00
13 changed files with 917 additions and 331 deletions

View File

@@ -1,5 +1,5 @@
# ToDo list
- [ ] stub files .pyi
- [x] stub files .pyi
- [ ] Add tests for all, we need 100% test coverate
- [ ] Log: add custom format for "stack_correct" if set, this will override the normal stack block
- [x] Log: add custom format for "stack_correct" if set, this will override the normal stack block

View File

@@ -1,7 +1,7 @@
# MARK: Project info
[project]
name = "corelibs"
version = "0.14.0"
version = "0.18.1"
description = "Collection of utils for Python scripts"
readme = "README.md"
requires-python = ">=3.13"

View File

@@ -19,19 +19,19 @@ def compile_re(reg: str) -> re.Pattern[str]:
# email regex
EMAIL_BASIC_REGEX = r"""
EMAIL_BASIC_REGEX: str = r"""
^[A-Za-z0-9!#$%&'*+\-\/=?^_`{|}~][A-Za-z0-9!#$%:\(\)&'*+\-\/=?^_`{|}~\.]{0,63}
@(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[a-zA-Z]{2,6}$
"""
# Domain regex with localhost
DOMAIN_WITH_LOCALHOST_REGEX = r"""
DOMAIN_WITH_LOCALHOST_REGEX: str = r"""
^(?:localhost|(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[A-Za-z]{2,})$
"""
# domain regex with loclhost and optional port
DOMAIN_WITH_LOCALHOST_PORT_REGEX = r"""
DOMAIN_WITH_LOCALHOST_PORT_REGEX: str = r"""
^(?:localhost|(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[A-Za-z]{2,})(?::\d+)?$
"""
# Domain, no localhost
DOMAIN_REGEX = r"^(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[A-Za-z]{2,}$"
DOMAIN_REGEX: str = r"^(?!-)[A-Za-z0-9-]{1,63}(?<!-)(?:\.[A-Za-z0-9-]{1,63}(?<!-))*\.[A-Za-z]{2,}$"
# __END__

View File

@@ -6,28 +6,39 @@ import traceback
import os
def traceback_call_str(start: int = 2, depth: int = 1):
def call_stack(
start: int = 0,
skip_last: int = -1,
separator: str = ' -> ',
reset_start_if_empty: bool = False
) -> str:
"""
get the trace for the last entry
Keyword Arguments:
start {int} -- _description_ (default: {2})
depth {int} -- _description_ (default: {1})
start {int} -- start, if too might output will empty until reset_start_if_empty is set (default: {0})
skip_last {int} -- how many of the last are skipped, defaults to -1 for current method (default: {-1})
seperator {str} -- add stack separator, if empty defaults to ' -> ' (default: { -> })
reset_start_if_empty {bool} -- if no stack returned because of too high start,
reset to 0 for full read (default: {False})
Returns:
_type_ -- _description_
str -- _description_
"""
# can't have more than in the stack for depth
depth = min(depth, start)
depth = start - depth
# 0 is full stack length from start
if depth == 0:
stack = traceback.extract_stack()[-start:]
else:
stack = traceback.extract_stack()[-start:-depth]
return ' -> '.join(
f"{os.path.basename(f.filename)}:{f.name}:{f.lineno}"
for f in stack
)
# stack = traceback.extract_stack()[start:depth]
# how many of the last entries we skip (so we do not get self), default is -1
# start cannot be negative
if skip_last > 0:
skip_last = skip_last * -1
stack = traceback.extract_stack()
__stack = stack[start:skip_last]
# start possible to high, reset start to 0
if not __stack and reset_start_if_empty:
start = 0
__stack = stack[start:skip_last]
if not separator:
separator = ' -> '
# print(f"* HERE: {dump_data(stack)}")
return f"{separator}".join(f"{os.path.basename(f.filename)}:{f.name}:{f.lineno}" for f in __stack)
# __END__

View File

@@ -0,0 +1,23 @@
"""
Exceptions for csv file reading and processing
"""
class NoCsvReader(Exception):
"""
CSV reader is none
"""
class CsvHeaderDataMissing(Exception):
"""
The csv reader returned None as headers, the header column in the csv file is missing
"""
class CompulsoryCsvHeaderCheckFailed(Exception):
"""
raise if the header is not matching to the excpeted values
"""
# __END__

View File

@@ -2,23 +2,40 @@
wrapper around search path
"""
from typing import Any
from typing import Any, TypedDict, NotRequired
from warnings import deprecated
class ArraySearchList(TypedDict):
"""find in array from list search dict"""
key: str
value: str | bool | int | float | list[str | None]
case_sensitive: NotRequired[bool]
@deprecated("Use find_in_array_from_list()")
def array_search(
search_params: list[dict[str, str | bool | list[str | None]]],
search_params: list[ArraySearchList],
data: list[dict[str, Any]],
return_index: bool = False
) -> list[dict[str, Any]]:
"""depreacted, old call order"""
return find_in_array_from_list(data, search_params, return_index)
def find_in_array_from_list(
data: list[dict[str, Any]],
search_params: list[ArraySearchList],
return_index: bool = False
) -> list[dict[str, Any]]:
"""
search in an array of dicts with an array of Key/Value set
search in an list of dicts with an list of Key/Value set
all Key/Value sets must match
Value set can be list for OR match
option: case_senstive: default True
Args:
search_params (list): List of search params in "Key"/"Value" lists with options
data (list): data to search in, must be a list
search_params (list): List of search params in "key"/"value" lists with options
return_index (bool): return index of list [default False]
Raises:
@@ -32,18 +49,20 @@ def array_search(
"""
if not isinstance(search_params, list): # type: ignore
raise ValueError("search_params must be a list")
keys = []
keys: list[str] = []
# check that key and value exist and are set
for search in search_params:
if not search.get('Key') or not search.get('Value'):
if not search.get('key') or not search.get('value'):
raise KeyError(
f"Either Key '{search.get('Key', '')}' or "
f"Value '{search.get('Value', '')}' is missing or empty"
f"Either Key '{search.get('key', '')}' or "
f"Value '{search.get('value', '')}' is missing or empty"
)
# if double key -> abort
if search.get("Key") in keys:
if search.get("key") in keys:
raise KeyError(
f"Key {search.get('Key', '')} already exists in search_params"
f"Key {search.get('key', '')} already exists in search_params"
)
keys.append(str(search['key']))
return_items: list[dict[str, Any]] = []
for si_idx, search_item in enumerate(data):
@@ -55,20 +74,20 @@ def array_search(
# lower case left side
# TODO: allow nested Keys. eg "Key: ["Key a", "key b"]" to be ["Key a"]["key b"]
if search.get("case_sensitive", True) is False:
search_value = search_item.get(str(search['Key']), "").lower()
search_value = search_item.get(str(search['key']), "").lower()
else:
search_value = search_item.get(str(search['Key']), "")
search_value = search_item.get(str(search['key']), "")
# lower case right side
if isinstance(search['Value'], list):
if isinstance(search['value'], list):
search_in = [
str(k).lower()
if search.get("case_sensitive", True) is False else k
for k in search['Value']
]
str(k).lower()
if search.get("case_sensitive", True) is False else k
for k in search['value']
]
elif search.get("case_sensitive", True) is False:
search_in = str(search['Value']).lower()
search_in = str(search['value']).lower()
else:
search_in = search['Value']
search_in = search['value']
# compare check
if (
(

View File

@@ -3,26 +3,36 @@ Dict helpers
"""
from typing import Any
from typing import TypeAlias, Union, Dict, List, Any, cast
# definitions for the mask run below
MaskableValue: TypeAlias = Union[str, int, float, bool, None]
NestedDict: TypeAlias = Dict[str, Union[MaskableValue, List[Any], 'NestedDict']]
ProcessableValue: TypeAlias = Union[MaskableValue, List[Any], NestedDict]
def mask(
data_set: dict[str, str],
data_set: dict[str, Any],
mask_keys: list[str] | None = None,
mask_str: str = "***",
mask_str_edges: str = '_',
skip: bool = False
) -> dict[str, str]:
) -> dict[str, Any]:
"""
mask data for output
Checks if mask_keys list exist in any key in the data set either from the start or at the end
Use the mask_str_edges to define how searches inside a string should work. Default it must start
and end with '_', remove to search string in string
Arguments:
data_set {dict[str, str]} -- _description_
Keyword Arguments:
mask_keys {list[str] | None} -- _description_ (default: {None})
mask_str {str} -- _description_ (default: {"***"})
skip {bool} -- _description_ (default: {False})
mask_str_edges {str} -- _description_ (default: {"_"})
skip {bool} -- if set to true skip (default: {False})
Returns:
dict[str, str] -- _description_
@@ -30,29 +40,46 @@ def mask(
if skip is True:
return data_set
if mask_keys is None:
mask_keys = ["password", "secret"]
mask_keys = ["encryption", "password", "secret"]
else:
# make sure it is lower case
mask_keys = [mask_key.lower() for mask_key in mask_keys]
def should_mask_key(key: str) -> bool:
"""Check if a key should be masked"""
__key_lower = key.lower()
return any(
__key_lower.startswith(mask_key) or
__key_lower.endswith(mask_key) or
f"{mask_str_edges}{mask_key}{mask_str_edges}" in __key_lower
for mask_key in mask_keys
)
def mask_recursive(obj: ProcessableValue) -> ProcessableValue:
"""Recursively mask values in nested structures"""
if isinstance(obj, dict):
return {
key: mask_value(value) if should_mask_key(key) else mask_recursive(value)
for key, value in obj.items()
}
if isinstance(obj, list):
return [mask_recursive(item) for item in obj]
return obj
def mask_value(value: Any) -> Any:
"""Handle masking based on value type"""
if isinstance(value, list):
# Mask each individual value in the list
return [mask_str for _ in cast('list[Any]', value)]
if isinstance(value, dict):
# Recursively process the dictionary instead of masking the whole thing
return mask_recursive(cast('ProcessableValue', value))
# Mask primitive values
return mask_str
return {
key: mask_str
if any(key.startswith(mask_key) or key.endswith(mask_key) for mask_key in mask_keys) else value
key: mask_value(value) if should_mask_key(key) else mask_recursive(value)
for key, value in data_set.items()
}
def set_entry(dict_set: dict[str, Any], key: str, value_set: Any) -> dict[str, Any]:
"""
set a new entry in the dict set
Arguments:
key {str} -- _description_
dict_set {dict[str, Any]} -- _description_
value_set {Any} -- _description_
Returns:
dict[str, Any] -- _description_
"""
if not dict_set.get(key):
dict_set[key] = {}
dict_set[key] = value_set
return dict_set
# __END__

View File

@@ -12,7 +12,7 @@ from pathlib import Path
from typing import MutableMapping, TextIO, TypedDict, Any, TYPE_CHECKING, cast
from corelibs.logging_handling.logging_level_handling.logging_level import LoggingLevel
from corelibs.string_handling.text_colors import Colors
from corelibs.debug_handling.debug_helpers import traceback_call_str
from corelibs.debug_handling.debug_helpers import call_stack
if TYPE_CHECKING:
from multiprocessing import Queue
@@ -20,12 +20,7 @@ if TYPE_CHECKING:
# MARK: Log settings TypedDict
class LogSettings(TypedDict):
"""
log settings
Arguments:
TypedDict {_type_} -- _description_
"""
"""log settings, for Log setup"""
log_level_console: LoggingLevel
log_level_file: LoggingLevel
console_enabled: bool
@@ -35,6 +30,12 @@ class LogSettings(TypedDict):
log_queue: 'Queue[str] | None'
class LoggerInit(TypedDict):
"""for Logger init"""
logger: logging.Logger
log_queue: 'Queue[str] | None'
# MARK: Custom color filter
class CustomConsoleFormatter(logging.Formatter):
"""
@@ -75,12 +76,15 @@ class CustomConsoleFormatter(logging.Formatter):
# TODO: add custom handlers for stack_trace, if not set fill with %(filename)s:%(funcName)s:%(lineno)d
# hasattr(record, 'stack_trace')
# also for something like "context" where we add an array of anything to a message
class CustomHandlerFilter(logging.Filter):
"""
Add a custom handler for filtering
"""
HANDLER_NAME_FILTER_EXCEPTION: str = 'console'
def __init__(self, handler_name: str, filter_exceptions: bool = False):
super().__init__(name=handler_name)
self.handler_name = handler_name
@@ -88,7 +92,7 @@ class CustomHandlerFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
# if console and exception do not show
if self.handler_name == 'console' and self.filter_exceptions:
if self.handler_name == self.HANDLER_NAME_FILTER_EXCEPTION and self.filter_exceptions:
return record.levelname != "EXCEPTION"
# if cnosole entry is true and traget file filter
if hasattr(record, 'console') and getattr(record, 'console') is True and self.handler_name == 'file':
@@ -99,8 +103,278 @@ class CustomHandlerFilter(logging.Filter):
# return record.levelname != "EXCEPTION"
# MARK: Log class
class Log:
# MARK: Parent class
class LogParent:
"""
Parent class with general methods
used by Log and Logger
"""
# spacer lenght characters and the character
SPACER_CHAR: str = '='
SPACER_LENGTH: int = 32
def __init__(self):
self.logger: logging.Logger
self.log_queue: 'Queue[str] | None' = None
self.handlers: dict[str, Any] = {}
# FIXME: we need to add a custom formater to add stack level listing if we want to
# Important note, although they exist, it is recommended to use self.logger.NAME directly
# so that the correct filename, method and row number is set
# for > 50 use logger.log(LoggingLevel.<LEVEL>.value, ...)
# for exception logger.log(LoggingLevel.EXCEPTION.value, ..., execInfo=True)
# MARK: log message
def log(self, level: int, msg: object, *args: object, extra: MutableMapping[str, object] | None = None):
"""log general"""
if not hasattr(self, 'logger'):
raise ValueError('Logger is not yet initialized')
if extra is None:
extra = {}
extra['stack_trace'] = call_stack(skip_last=2)
self.logger.log(level, msg, *args, extra=extra, stacklevel=2)
# MARK: DEBUG 10
def debug(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
"""debug"""
if not hasattr(self, 'logger'):
raise ValueError('Logger is not yet initialized')
if extra is None:
extra = {}
extra['stack_trace'] = call_stack(skip_last=2)
self.logger.debug(msg, *args, extra=extra, stacklevel=2)
# MARK: INFO 20
def info(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
"""info"""
if not hasattr(self, 'logger'):
raise ValueError('Logger is not yet initialized')
if extra is None:
extra = {}
extra['stack_trace'] = call_stack(skip_last=2)
self.logger.info(msg, *args, extra=extra, stacklevel=2)
# MARK: WARNING 30
def warning(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
"""warning"""
if not hasattr(self, 'logger'):
raise ValueError('Logger is not yet initialized')
if extra is None:
extra = {}
extra['stack_trace'] = call_stack(skip_last=2)
self.logger.warning(msg, *args, extra=extra, stacklevel=2)
# MARK: ERROR 40
def error(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
"""error"""
if not hasattr(self, 'logger'):
raise ValueError('Logger is not yet initialized')
if extra is None:
extra = {}
extra['stack_trace'] = call_stack(skip_last=2)
self.logger.error(msg, *args, extra=extra, stacklevel=2)
# MARK: CRITICAL 50
def critical(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
"""critcal"""
if not hasattr(self, 'logger'):
raise ValueError('Logger is not yet initialized')
if extra is None:
extra = {}
extra['stack_trace'] = call_stack(skip_last=2)
self.logger.critical(msg, *args, extra=extra, stacklevel=2)
# MARK: ALERT 55
def alert(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
"""alert"""
if not hasattr(self, 'logger'):
raise ValueError('Logger is not yet initialized')
# extra_dict = dict(extra)
if extra is None:
extra = {}
extra['stack_trace'] = call_stack(skip_last=2)
self.logger.log(LoggingLevel.ALERT.value, msg, *args, extra=extra, stacklevel=2)
# MARK: EMERGECNY: 60
def emergency(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
"""emergency"""
if not hasattr(self, 'logger'):
raise ValueError('Logger is not yet initialized')
if extra is None:
extra = {}
extra['stack_trace'] = call_stack(skip_last=2)
self.logger.log(LoggingLevel.EMERGENCY.value, msg, *args, extra=extra, stacklevel=2)
# MARK: EXCEPTION: 70
def exception(
self,
msg: object, *args: object, extra: MutableMapping[str, object] | None = None,
log_error: bool = True
) -> None:
"""
log on exceotion level, this is log.exception, but logs with a new level
Args:
msg (object): _description_
*args (object): arguments for msg
extra: Mapping[str, object] | None: extra arguments for the formatting if needed
log_error: (bool): If set to false will not write additional error message for console (Default True)
"""
if not hasattr(self, 'logger'):
raise ValueError('Logger is not yet initialized')
if extra is None:
extra = {}
extra['stack_trace'] = call_stack(skip_last=2)
# write to console first with extra flag for filtering in file
if log_error:
self.logger.log(
LoggingLevel.ERROR.value,
f"<=EXCEPTION> {msg}", *args, extra=dict(extra) | {'console': True}, stacklevel=2
)
self.logger.log(LoggingLevel.EXCEPTION.value, msg, *args, exc_info=True, extra=extra, stacklevel=2)
def break_line(self, info: str = "BREAK"):
"""
add a break line as info level
Keyword Arguments:
info {str} -- _description_ (default: {"BREAK"})
"""
if not hasattr(self, 'logger'):
raise ValueError('Logger is not yet initialized')
self.logger.info("[%s] %s>", info, self.SPACER_CHAR * self.SPACER_LENGTH)
# MARK: queue handling
def flush(self, handler_name: str | None = None, timeout: float = 2.0) -> bool:
"""
Flush all pending messages
Keyword Arguments:
handler_name {str | None} -- _description_ (default: {None})
timeout {float} -- _description_ (default: {2.0})
Returns:
bool -- _description_
"""
if not self.log_queue:
return False
try:
# Wait for queue to be processed
start_time = time.time()
while not self.log_queue.empty() and (time.time() - start_time) < timeout:
time.sleep(0.01)
# Flush all handlers or handler given
if handler_name:
try:
self.handlers[handler_name].flush()
except IndexError:
pass
else:
for handler in self.handlers.values():
handler.flush()
except OSError:
return False
return True
# MARK: log level handling
def set_log_level(self, handler_name: str, log_level: LoggingLevel) -> bool:
"""
set the logging level for a handler
Arguments:
handler {str} -- _description_
log_level {LoggingLevel} -- _description_
Returns:
bool -- _description_
"""
try:
# flush queue befoe changing logging level
self.flush(handler_name)
self.handlers[handler_name].setLevel(log_level.name)
return True
except IndexError:
if self.logger:
self.logger.error('Handler %s not found, cannot change log level', handler_name)
return False
except AttributeError:
if self.logger:
self.logger.error(
'Cannot change to log level %s for handler %s, log level invalid',
LoggingLevel.name, handler_name
)
return False
def get_log_level(self, handler_name: str) -> LoggingLevel:
"""
gettthe logging level for a handler
Arguments:
handler_name {str} -- _description_
Returns:
LoggingLevel -- _description_
"""
try:
return LoggingLevel.from_any(self.handlers[handler_name].level)
except IndexError:
return LoggingLevel.NOTSET
@staticmethod
def validate_log_level(log_level: Any) -> bool:
"""
if the log level is invalid will return false, else return true
Args:
log_level (Any): _description_
Returns:
bool: _description_
"""
try:
_ = LoggingLevel.from_any(log_level).value
return True
except ValueError:
return False
@staticmethod
def get_log_level_int(log_level: Any) -> int:
"""
Return log level as INT
If invalid returns the default log level
Arguments:
log_level {Any} -- _description_
Returns:
int -- _description_
"""
try:
return LoggingLevel.from_any(log_level).value
except ValueError:
return LoggingLevel.from_string(Log.DEFAULT_LOG_LEVEL.name).value
# MARK: Logger
class Logger(LogParent):
"""
The class we can pass on to other clases without re-init the class itself
NOTE: if no queue object is handled over the logging level change might not take immediate effect
"""
def __init__(self, logger_settings: LoggerInit):
LogParent.__init__(self)
self.logger = logger_settings['logger']
self.lg = self.logger
self.l = self.logger
self.handlers = {str(_handler.name): _handler for _handler in self.logger.handlers}
self.log_queue = logger_settings['log_queue']
# MARK: LogSetup class
class Log(LogParent):
"""
logger setup
"""
@@ -114,8 +388,8 @@ class Log:
DEFAULT_LOG_LEVEL_CONSOLE: LoggingLevel = LoggingLevel.WARNING
# default settings
DEFAULT_LOG_SETTINGS: LogSettings = {
"log_level_console": LoggingLevel.WARNING,
"log_level_file": LoggingLevel.DEBUG,
"log_level_console": DEFAULT_LOG_LEVEL_CONSOLE,
"log_level_file": DEFAULT_LOG_LEVEL_FILE,
"console_enabled": True,
"console_color_output_enabled": True,
"add_start_info": True,
@@ -131,6 +405,7 @@ class Log:
log_settings: dict[str, 'LoggingLevel | str | bool | None | Queue[str]'] | LogSettings | None = None,
other_handlers: dict[str, Any] | None = None
):
LogParent.__init__(self)
# add new level for alert, emergecny and exception
logging.addLevelName(LoggingLevel.ALERT.value, LoggingLevel.ALERT.name)
logging.addLevelName(LoggingLevel.EMERGENCY.value, LoggingLevel.EMERGENCY.name)
@@ -164,12 +439,12 @@ class Log:
# Any is logging.StreamHandler, logging.FileHandler and all logging.handlers.*
self.handlers: dict[str, Any] = {}
self.add_handler('file_handler', self.__create_timed_rotating_file_handler(
self.log_settings['log_level_file'], log_path)
'file_handler', self.log_settings['log_level_file'], log_path)
)
if self.log_settings['console_enabled']:
# console
self.add_handler('stream_handler', self.__create_console_handler(
self.log_settings['log_level_console'])
'stream_handler', self.log_settings['log_level_console'])
)
# add other handlers,
if other_handlers is not None:
@@ -260,7 +535,8 @@ class Log:
# MARK: console handler
def __create_console_handler(
self, log_level_console: LoggingLevel = LoggingLevel.WARNING, filter_exceptions: bool = True
self, handler_name: str,
log_level_console: LoggingLevel = LoggingLevel.WARNING, filter_exceptions: bool = True
) -> logging.StreamHandler[TextIO]:
# console logger
if not self.validate_log_level(log_level_console):
@@ -280,7 +556,7 @@ class Log:
formatter_console = CustomConsoleFormatter(format_string, datefmt=format_date)
else:
formatter_console = logging.Formatter(format_string, datefmt=format_date)
console_handler.set_name('console')
console_handler.set_name(handler_name)
console_handler.setLevel(log_level_console.name)
# do not show exceptions logs on console
console_handler.addFilter(CustomHandlerFilter('console', filter_exceptions))
@@ -289,7 +565,8 @@ class Log:
# MARK: file handler
def __create_timed_rotating_file_handler(
self, log_level_file: LoggingLevel, log_path: Path,
self, handler_name: str,
log_level_file: LoggingLevel, log_path: Path,
when: str = "D", interval: int = 1, backup_count: int = 0
) -> logging.handlers.TimedRotatingFileHandler:
# file logger
@@ -322,7 +599,7 @@ class Log:
),
datefmt="%Y-%m-%dT%H:%M:%S",
)
file_handler.set_name('file_timed_rotate')
file_handler.set_name(handler_name)
file_handler.setLevel(log_level_file.name)
# do not show errors flagged with console (they are from exceptions)
file_handler.addFilter(CustomHandlerFilter('file'))
@@ -347,6 +624,14 @@ class Log:
)
self.listener.start()
def stop_listener(self):
"""
stop the listener
"""
if self.listener is not None:
self.flush()
self.listener.stop()
# MARK: init main log
def __init_log(self, log_name: str) -> None:
"""
@@ -389,250 +674,16 @@ class Log:
return root_logger
# FIXME: we need to add a custom formater to add stack level listing if we want to
# Important note, although they exist, it is recommended to use self.logger.NAME directly
# so that the correct filename, method and row number is set
# for > 50 use logger.log(LoggingLevel.<LEVEL>.value, ...)
# for exception logger.log(LoggingLevel.EXCEPTION.value, ..., execInfo=True)
# MARK: log message
def log(self, level: int, msg: object, *args: object, extra: MutableMapping[str, object] | None = None):
"""log general"""
if not hasattr(self, 'logger'):
raise ValueError('Logger is not yet initialized')
if extra is None:
extra = {}
extra['stack_trace'] = traceback_call_str(start=3)
self.logger.log(level, msg, *args, extra=extra, stacklevel=2)
# MARK: DEBUG 10
def debug(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
"""debug"""
if not hasattr(self, 'logger'):
raise ValueError('Logger is not yet initialized')
if extra is None:
extra = {}
extra['stack_trace'] = traceback_call_str(start=3)
self.logger.debug(msg, *args, extra=extra, stacklevel=2)
# MARK: INFO 20
def info(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
"""info"""
if not hasattr(self, 'logger'):
raise ValueError('Logger is not yet initialized')
if extra is None:
extra = {}
extra['stack_trace'] = traceback_call_str(start=3)
self.logger.info(msg, *args, extra=extra, stacklevel=2)
# MARK: WARNING 30
def warning(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
"""warning"""
if not hasattr(self, 'logger'):
raise ValueError('Logger is not yet initialized')
if extra is None:
extra = {}
extra['stack_trace'] = traceback_call_str(start=3)
self.logger.warning(msg, *args, extra=extra, stacklevel=2)
# MARK: ERROR 40
def error(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
"""error"""
if not hasattr(self, 'logger'):
raise ValueError('Logger is not yet initialized')
if extra is None:
extra = {}
extra['stack_trace'] = traceback_call_str(start=3)
self.logger.error(msg, *args, extra=extra, stacklevel=2)
# MARK: CRITICAL 50
def critical(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
"""critcal"""
if not hasattr(self, 'logger'):
raise ValueError('Logger is not yet initialized')
if extra is None:
extra = {}
extra['stack_trace'] = traceback_call_str(start=3)
self.logger.critical(msg, *args, extra=extra, stacklevel=2)
# MARK: ALERT 55
def alert(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
"""alert"""
if not hasattr(self, 'logger'):
raise ValueError('Logger is not yet initialized')
# extra_dict = dict(extra)
if extra is None:
extra = {}
extra['stack_trace'] = traceback_call_str(start=3)
self.logger.log(LoggingLevel.ALERT.value, msg, *args, extra=extra, stacklevel=2)
# MARK: EMERGECNY: 60
def emergency(self, msg: object, *args: object, extra: MutableMapping[str, object] | None = None) -> None:
"""emergency"""
if not hasattr(self, 'logger'):
raise ValueError('Logger is not yet initialized')
if extra is None:
extra = {}
extra['stack_trace'] = traceback_call_str(start=3)
self.logger.log(LoggingLevel.EMERGENCY.value, msg, *args, extra=extra, stacklevel=2)
# MARK: EXCEPTION: 70
def exception(
self,
msg: object, *args: object, extra: MutableMapping[str, object] | None = None,
log_error: bool = True
) -> None:
def get_logger_settings(self) -> LoggerInit:
"""
log on exceotion level, this is log.exception, but logs with a new level
Args:
msg (object): _description_
*args (object): arguments for msg
extra: Mapping[str, object] | None: extra arguments for the formatting if needed
log_error: (bool): If set to false will not write additional error message for console (Default True)
"""
if not hasattr(self, 'logger'):
raise ValueError('Logger is not yet initialized')
if extra is None:
extra = {}
extra['stack_trace'] = traceback_call_str(start=3)
# write to console first with extra flag for filtering in file
if log_error:
self.logger.log(
LoggingLevel.ERROR.value,
f"<=EXCEPTION> {msg}", *args, extra=dict(extra) | {'console': True}, stacklevel=2
)
self.logger.log(LoggingLevel.EXCEPTION.value, msg, *args, exc_info=True, extra=extra, stacklevel=2)
# MARK: break line
def break_line(self, info: str = "BREAK"):
"""
add a break line as info level
Keyword Arguments:
info {str} -- _description_ (default: {"BREAK"})
"""
if not hasattr(self, 'logger'):
raise ValueError('Logger is not yet initialized')
self.logger.info("[%s] %s>", info, self.SPACER_CHAR * self.SPACER_LENGTH)
# MARK: queue handling
def flush(self, handler_name: str | None = None, timeout: float = 2.0) -> bool:
"""
Flush all pending messages
Keyword Arguments:
handler_name {str | None} -- _description_ (default: {None})
timeout {float} -- _description_ (default: {2.0})
get the logger settings we need to init the Logger class
Returns:
bool -- _description_
LoggerInit -- _description_
"""
if not self.listener or not self.log_queue:
return False
try:
# Wait for queue to be processed
start_time = time.time()
while not self.log_queue.empty() and (time.time() - start_time) < timeout:
time.sleep(0.01)
# Flush all handlers or handler given
if handler_name:
try:
self.handlers[handler_name].flush()
except IndexError:
pass
else:
for handler in self.handlers.values():
handler.flush()
except OSError:
return False
return True
def stop_listener(self):
"""
stop the listener
"""
if self.listener is not None:
self.flush()
self.listener.stop()
# MARK: log level handling
def set_log_level(self, handler_name: str, log_level: LoggingLevel) -> bool:
"""
set the logging level for a handler
Arguments:
handler {str} -- _description_
log_level {LoggingLevel} -- _description_
Returns:
bool -- _description_
"""
try:
# flush queue befoe changing logging level
self.flush(handler_name)
self.handlers[handler_name].setLevel(log_level.name)
return True
except IndexError:
if self.logger:
self.logger.error('Handler %s not found, cannot change log level', handler_name)
return False
except AttributeError:
if self.logger:
self.logger.error(
'Cannot change to log level %s for handler %s, log level invalid',
LoggingLevel.name, handler_name
)
return False
def get_log_level(self, handler_name: str) -> LoggingLevel:
"""
gettthe logging level for a handler
Arguments:
handler_name {str} -- _description_
Returns:
LoggingLevel -- _description_
"""
try:
return self.handlers[handler_name]
except IndexError:
return LoggingLevel.NOTSET
@staticmethod
def validate_log_level(log_level: Any) -> bool:
"""
if the log level is invalid will return false, else return true
Args:
log_level (Any): _description_
Returns:
bool: _description_
"""
try:
_ = LoggingLevel.from_any(log_level).value
return True
except ValueError:
return False
@staticmethod
def get_log_level_int(log_level: Any) -> int:
"""
Return log level as INT
If invalid returns the default log level
Arguments:
log_level {Any} -- _description_
Returns:
int -- _description_
"""
try:
return LoggingLevel.from_any(log_level).value
except ValueError:
return LoggingLevel.from_string(Log.DEFAULT_LOG_LEVEL.name).value
return {
"logger": self.logger,
"log_queue": self.log_queue
}
# __END__

View File

@@ -0,0 +1,52 @@
#!/usr/bin/env python3
"""
Search data tests
iterator_handling.data_search
"""
from corelibs.debug_handling.dump_data import dump_data
from corelibs.iterator_handling.data_search import find_in_array_from_list, ArraySearchList
def main() -> None:
"""
Comment
"""
data = [
{
"lookup_value_p": "A01",
"lookup_value_c": "B01",
"replace_value": "R01",
},
{
"lookup_value_p": "A02",
"lookup_value_c": "B02",
"replace_value": "R02",
},
]
test_foo = ArraySearchList(
key = "lookup_value_p",
value = "A01"
)
print(test_foo)
search: list[ArraySearchList] = [
{
"key": "lookup_value_p",
"value": "A01"
},
{
"key": "lookup_value_c",
"value": "B01"
}
]
result = find_in_array_from_list(data, search)
print(f"Search {dump_data(search)} -> {dump_data(result)}")
if __name__ == "__main__":
main()
# __END__

View File

@@ -0,0 +1,106 @@
"""
Iterator helper testing
"""
from corelibs.debug_handling.dump_data import dump_data
from corelibs.iterator_handling.dict_helpers import mask
def __mask():
data = {
# "user": "john",
# "encryption_key": "Secret key",
# "ENCRYPTION.TEST": "Secret key test",
# "inside_password_test": "Hide this",
"password": ["secret1", "secret2"], # List value gets masked
# "config": {
# "db_password": {"primary": "secret", "backup": "secret2"}, # Dict value gets masked
# "api_keys": ["key1", "key2", "key3"] # List value gets masked
# },
# "items": [ # List value that doesn't get masked, but gets processed recursively
# {"name": "item1", "secret_key": "itemsecret"},
# {"name": "item2", "passwords": ["pass1", "pass2"]}
# ],
# "normal_list": ["item1", "item2", "item3"] # Normal list, not masked
}
data = {
"config": {
# "password": ["secret1", "secret2"],
# "password_other": {"password": ["secret1", "secret2"]},
# "database": {
# "host": "localhost",
# "password": "db_secret",
# "users": [
# {"name": "admin", "password": "admin123"},
# {"name": "user", "secret_key": "user456"}
# ]
# },
# "api": {
# # "endpoints": ["api1", "api2"],
# "encryption_settings": {
# "enabled": True,
# "secret": "api_secret"
# }
# }
"secret_key": "normal_value",
"api_key": "normal_value",
"my_key_value": "normal_value",
}
}
data = {
"basic": {
"log_level_console": "DEBUG",
"log_level_file": "DEBUG",
"storage_interface": "sqlite",
"content_start_date": "2023-1-1",
"encryption_key": "ENCRYPTION_KEY"
},
"email": {
"alert_email": [
"test+z-sd@tequila.jp"
]
},
"poller": {
"max_forks": "1",
"interface": "Zac"
},
"pusher": {
"max_forks": "3",
"interface": "Screendragon"
},
"api:Zac": {
"type": "zac",
"client_id": "oro_zac_demo",
"client_secret": "CLIENT_SECRET",
"username": "zacuser",
"password": "ZACuser3",
"hostname": "e-gra2.zac.ai",
"appname": "e-gra2_api_trial",
"api_path": "b/api/v2"
},
"api:Screendragon": {
"type": "screendragon",
"client_id": "omniprostaging",
"encryption_client": "SOME_SECRET",
"client_encryption": "SOME_SECRET",
"secret_client": "SOME_SECRET",
"client_secret": "SOME_SECRET",
"hostname": "omniprostaging.screendragon.com",
"appname": "sdapi",
"api_path": "api"
}
}
result = mask(data)
print(f"** In: {dump_data(data)}")
print(f"===> Masked: {dump_data(result)}")
def main():
"""
Test: corelibs.string_handling.string_helpers
"""
__mask()
if __name__ == "__main__":
main()

View File

@@ -5,7 +5,7 @@ Log logging_handling.log testing
# import atexit
from pathlib import Path
# this is for testing only
from corelibs.logging_handling.log import Log
from corelibs.logging_handling.log import Log, Logger
from corelibs.logging_handling.logging_level_handling.logging_level import LoggingLevel
@@ -18,16 +18,19 @@ def main():
log_path=script_path.joinpath('log', 'test.log'),
log_name="Test Log",
log_settings={
# "log_level_console": 'DEBUG',
"log_level_console": None,
"log_level_console": 'DEBUG',
# "log_level_console": None,
"log_level_file": 'DEBUG',
# "console_color_output_enabled": False,
}
)
logn = Logger(log.get_logger_settings())
log.logger.debug('[NORMAL] Debug test: %s', log.logger.name)
log.lg.debug('[NORMAL] Debug test: %s', log.logger.name)
log.debug('[NORMAL-] Debug test: %s', log.logger.name)
logn.lg.debug('[NORMAL N] Debug test: %s', log.logger.name)
logn.debug('[NORMAL N-] Debug test: %s', log.logger.name)
log.logger.info('[NORMAL] Info test: %s', log.logger.name)
log.info('[NORMAL-] Info test: %s', log.logger.name)
log.logger.warning('[NORMAL] Warning test: %s', log.logger.name)
@@ -79,7 +82,10 @@ def main():
log.exception("Divison through zero: %s", e)
for handler in log.logger.handlers:
print(f"Handler (logger) {handler} -> {handler.level} -> {LoggingLevel.from_any(handler.level)}")
print(
f"** Handler (logger) {handler} [{handler.name}] -> "
f"{handler.level} -> {LoggingLevel.from_any(handler.level)}"
)
for key, handler in log.handlers.items():
print(f"Handler (handlers) [{key}] {handler} -> {handler.level} -> {LoggingLevel.from_any(handler.level)}")

View File

@@ -0,0 +1,291 @@
"""
tests for corelibs.iterator_handling.dict_helpers
"""
import pytest
from typing import Any
from corelibs.iterator_handling.dict_helpers import mask
def test_mask_default_behavior():
"""Test masking with default mask_keys"""
data = {
"username": "john_doe",
"password": "secret123",
"email": "john@example.com",
"api_secret": "abc123",
"encryption_key": "xyz789"
}
result = mask(data)
assert result["username"] == "john_doe"
assert result["password"] == "***"
assert result["email"] == "john@example.com"
assert result["api_secret"] == "***"
assert result["encryption_key"] == "***"
def test_mask_custom_keys():
"""Test masking with custom mask_keys"""
data = {
"username": "john_doe",
"token": "abc123",
"api_key": "xyz789",
"password": "secret123"
}
result = mask(data, mask_keys=["token", "api"])
assert result["username"] == "john_doe"
assert result["token"] == "***"
assert result["api_key"] == "***"
assert result["password"] == "secret123" # Not masked with custom keys
def test_mask_custom_mask_string():
"""Test masking with custom mask string"""
data = {"password": "secret123"}
result = mask(data, mask_str="[HIDDEN]")
assert result["password"] == "[HIDDEN]"
def test_mask_case_insensitive():
"""Test that masking is case insensitive"""
data = {
"PASSWORD": "secret123",
"Secret_Key": "abc123",
"ENCRYPTION_data": "xyz789"
}
result = mask(data)
assert result["PASSWORD"] == "***"
assert result["Secret_Key"] == "***"
assert result["ENCRYPTION_data"] == "***"
def test_mask_key_patterns():
"""Test different key matching patterns (start, end, contains)"""
data = {
"password_hash": "hash123", # starts with
"user_password": "secret123", # ends with
"my_secret_key": "abc123", # contains with edges
"secretvalue": "xyz789", # contains without edges
"startsecretvalue": "xyz123", # contains without edges
"normal_key": "normal_value"
}
result = mask(data)
assert result["password_hash"] == "***"
assert result["user_password"] == "***"
assert result["my_secret_key"] == "***"
assert result["secretvalue"] == "***" # will mask beacuse starts with
assert result["startsecretvalue"] == "xyz123" # will not mask
assert result["normal_key"] == "normal_value"
def test_mask_custom_edges():
"""Test masking with custom edge characters"""
data = {
"my-secret-key": "abc123",
"my_secret_key": "xyz789"
}
result = mask(data, mask_str_edges="-")
assert result["my-secret-key"] == "***"
assert result["my_secret_key"] == "xyz789" # Underscore edges don't match
def test_mask_empty_edges():
"""Test masking with empty edge characters (substring matching)"""
data = {
"secretvalue": "abc123",
"mysecretkey": "xyz789",
"normal_key": "normal_value"
}
result = mask(data, mask_str_edges="")
assert result["secretvalue"] == "***"
assert result["mysecretkey"] == "***"
assert result["normal_key"] == "normal_value"
def test_mask_nested_dict():
"""Test masking nested dictionaries"""
data = {
"user": {
"name": "john",
"password": "secret123",
"profile": {
"email": "john@example.com",
"encryption_key": "abc123"
}
},
"api_secret": "xyz789"
}
result = mask(data)
assert result["user"]["name"] == "john"
assert result["user"]["password"] == "***"
assert result["user"]["profile"]["email"] == "john@example.com"
assert result["user"]["profile"]["encryption_key"] == "***"
assert result["api_secret"] == "***"
def test_mask_lists():
"""Test masking lists and nested structures with lists"""
data = {
"users": [
{"name": "john", "password": "secret1"},
{"name": "jane", "password": "secret2"}
],
"secrets": ["secret1", "secret2", "secret3"]
}
result = mask(data)
print(f"R {result['secrets']}")
assert result["users"][0]["name"] == "john"
assert result["users"][0]["password"] == "***"
assert result["users"][1]["name"] == "jane"
assert result["users"][1]["password"] == "***"
assert result["secrets"] == ["***", "***", "***"]
def test_mask_mixed_types():
"""Test masking with different value types"""
data = {
"password": "string_value",
"secret_number": 12345,
"encryption_flag": True,
"secret_float": 3.14,
"password_none": None,
"normal_key": "normal_value"
}
result = mask(data)
assert result["password"] == "***"
assert result["secret_number"] == "***"
assert result["encryption_flag"] == "***"
assert result["secret_float"] == "***"
assert result["password_none"] == "***"
assert result["normal_key"] == "normal_value"
def test_mask_skip_true():
"""Test that skip=True returns original data unchanged"""
data = {
"password": "secret123",
"encryption_key": "abc123",
"normal_key": "normal_value"
}
result = mask(data, skip=True)
assert result == data
assert result is data # Should return the same object
def test_mask_empty_dict():
"""Test masking empty dictionary"""
data: dict[str, Any] = {}
result = mask(data)
assert result == {}
def test_mask_none_mask_keys():
"""Test explicit None mask_keys uses defaults"""
data = {"password": "secret123", "token": "abc123"}
result = mask(data, mask_keys=None)
assert result["password"] == "***"
assert result["token"] == "abc123" # Not in default keys
def test_mask_empty_mask_keys():
"""Test empty mask_keys list"""
data = {"password": "secret123", "secret": "abc123"}
result = mask(data, mask_keys=[])
assert result["password"] == "secret123"
assert result["secret"] == "abc123"
def test_mask_complex_nested_structure():
"""Test masking complex nested structure"""
data = {
"config": {
"database": {
"host": "localhost",
"password": "db_secret",
"users": [
{"name": "admin", "password": "admin123"},
{"name": "user", "secret_key": "user456"}
]
},
"api": {
"endpoints": ["api1", "api2"],
"encryption_settings": {
"enabled": True,
"secret": "api_secret"
}
}
}
}
result = mask(data)
assert result["config"]["database"]["host"] == "localhost"
assert result["config"]["database"]["password"] == "***"
assert result["config"]["database"]["users"][0]["name"] == "admin"
assert result["config"]["database"]["users"][0]["password"] == "***"
assert result["config"]["database"]["users"][1]["name"] == "user"
assert result["config"]["database"]["users"][1]["secret_key"] == "***"
assert result["config"]["api"]["endpoints"] == ["api1", "api2"]
assert result["config"]["api"]["encryption_settings"]["enabled"] is True
assert result["config"]["api"]["encryption_settings"]["secret"] == "***"
def test_mask_preserves_original_data():
"""Test that original data is not modified"""
original_data = {
"password": "secret123",
"username": "john_doe"
}
data_copy = original_data.copy()
result = mask(original_data)
assert original_data == data_copy # Original unchanged
assert result != original_data # Result is different
assert result["password"] == "***"
assert original_data["password"] == "secret123"
@pytest.mark.parametrize("mask_key,expected_keys", [
(["pass"], ["password", "user_pass", "my_pass_key"]),
(["key"], ["api_key", "secret_key", "my_key_value"]),
(["token"], ["token", "auth_token", "my_token_here"]),
])
def test_mask_parametrized_keys(mask_key: list[str], expected_keys: list[str]):
"""Parametrized test for different mask key patterns"""
data = {key: "value" for key in expected_keys}
data["normal_entry"] = "normal_value"
result = mask(data, mask_keys=mask_key)
for key in expected_keys:
assert result[key] == "***"
assert result["normal_entry"] == "normal_value"

2
uv.lock generated
View File

@@ -44,7 +44,7 @@ wheels = [
[[package]]
name = "corelibs"
version = "0.13.2"
version = "0.18.0"
source = { editable = "." }
dependencies = [
{ name = "jmespath" },