CoreLibs for Python

this is an intial test install.
The folder and file names will differ and some things will move around.

1.0.0 will have the first usable setup
This commit is contained in:
Clemens Schwaighofer
2025-07-01 15:05:32 +09:00
commit e778e7a42f
29 changed files with 1354 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@@ -0,0 +1,5 @@
.venv/
**/__pycache__/*
**/*.egg-info
.mypy_cache/
**/.env

19
ReadMe.md Normal file
View File

@@ -0,0 +1,19 @@
# CoreLibs for Python
This is a pip package that can be installed into any project and covers the following pars
- loggingn update with exception logs
- requests wrapper for easier auth pass on access
- dict fingerprinting
- jmespath search
- dump outputs for data
- progress printing
## Python venv setup
In the folder where the script will be located
```sh
uv venv --python 3.13
```

48
pyproject.toml Normal file
View File

@@ -0,0 +1,48 @@
# MARK: Project info
[project]
name = "corelibs-python"
version = "0.1.0"
description = "Collection of utils for Python scripts"
readme = "ReadMe.md"
requires-python = ">=3.13"
dependencies = [
"jmespath>=1.0.1",
"psutil>=7.0.0",
]
# set this to disable publish to pypi (pip)
# classifiers = ["Private :: Do Not Upload"]
# MARK: build target
[[tool.uv.index]]
name = "egra-gitea"
url = "https://git.egplusww.jp/org/PyPI/dashboard"
publish-url = "https://git.egplusww.jp/api/packages/PyPI/pypi"
explicit = true
# MARK: build system
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
# MARK: Python tools
[tool.pyright]
typeCheckingMode = "strict"
reportMissingImports = "information"
reportMissingTypeStubs = "information"
reportUnknownMemberType = "information"
[tool.ruff]
line-length = 120
[tool.black]
# set 10 short for better formatting
line-length = 110
# to avoid " ... " " ... " string sets
# experimental-string-processing = true
preview = true
enable-unstable-feature = ["string_processing"]
[tool.pylint.format]
max-line-length = 120
[tool.pylint.miscellaneous]
notes = ["FIXME", "TODO"]
notes-rgx = '(FIXME|TODO)(\((TTD-|#)\[0-9]+\))'
[tool.flake8]
max-line-length = 120

0
src/CoreLibs/__init__.py Normal file
View File

View File

View File

@@ -0,0 +1,91 @@
"""
Write to CSV file
- each class set is one file write with one header set
"""
from typing import Any
from pathlib import Path
from collections import Counter
import csv
class CsvWriter:
"""
write to a CSV file
"""
def __init__(
self,
path: Path,
file_name: str,
header: dict[str, str],
header_order: list[str] | None = None
):
self.path = path
self.file_name = file_name
# Key: index for write for the line dict, Values: header entries
self.header = header
self.csv_file_writer = self.__open_csv(header_order)
def __open_csv(self, header_order: list[str] | None) -> 'csv.DictWriter[str] | None':
"""
open csv file for writing, write headers
Note that if there is no header_order set we use the order in header dictionary
Arguments:
line {list[str] | None} -- optional dedicated header order
Returns:
csv.DictWriter[str] | None: _description_
"""
# if header order is set, make sure all header value fields exist
header_values = self.header.values()
if header_order is not None:
if Counter(header_values) != Counter(header_order):
print(
"header order does not match header values: "
f"{', '.join(header_values)} != {', '.join(header_order)}"
)
return None
header_values = header_order
# no duplicates
if len(header_values) != len(set(header_values)):
print(f"Header must have unique values only: {', '.join(header_values)}")
return None
try:
fp = open(
self.path.joinpath(self.file_name),
"w", encoding="utf-8"
)
csv_file_writer = csv.DictWriter(
fp,
fieldnames=header_values,
delimiter=",",
quotechar='"',
quoting=csv.QUOTE_MINIMAL,
)
csv_file_writer.writeheader()
return csv_file_writer
except OSError as err:
print("OS error:", err)
return None
def write_csv(self, line: dict[str, str]) -> bool:
"""
write member csv line
Arguments:
line {dict[str, str]} -- _description_
Returns:
bool -- _description_
"""
if self.csv_file_writer is None:
return False
csv_row: dict[str, Any] = {}
# only write entries that are in the header list
for key, value in self.header.items():
csv_row[value] = line[key]
self.csv_file_writer.writerow(csv_row)
return True

View File

View File

@@ -0,0 +1,126 @@
"""
Profile memory usage in Python
"""
# https://docs.python.org/3/library/tracemalloc.html
import os
import time
import tracemalloc
import linecache
from typing import Tuple
from tracemalloc import Snapshot
import psutil
def display_top(snapshot: Snapshot, key_type: str = 'lineno', limit: int = 10) -> str:
"""
Print tracmalloc stats
https://docs.python.org/3/library/tracemalloc.html#pretty-top
Args:
snapshot (Snapshot): _description_
key_type (str, optional): _description_. Defaults to 'lineno'.
limit (int, optional): _description_. Defaults to 10.
"""
snapshot = snapshot.filter_traces((
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
tracemalloc.Filter(False, "<unknown>"),
))
top_stats = snapshot.statistics(key_type)
profiler_msg = f"Top {limit} lines"
for index, stat in enumerate(top_stats[:limit], 1):
frame = stat.traceback[0]
# replace "/path/to/module/file.py" with "module/file.py"
filename = os.sep.join(frame.filename.split(os.sep)[-2:])
profiler_msg += f"#{index}: {filename}:{frame.lineno}: {(stat.size / 1024):.1f} KiB"
line = linecache.getline(frame.filename, frame.lineno).strip()
if line:
profiler_msg += f" {line}"
other = top_stats[limit:]
if other:
size = sum(stat.size for stat in other)
profiler_msg += f"{len(other)} other: {(size / 1024):.1f} KiB"
total = sum(stat.size for stat in top_stats)
profiler_msg += f"Total allocated size: {(total / 1024):.1f} KiB"
return profiler_msg
class Profiling:
"""
Profile memory usage and elapsed time for some block
Based on: https://stackoverflow.com/a/53301648
"""
def __init__(self):
# profiling id
self.__ident: str = ''
# memory
self.__rss_before: int = 0
self.__vms_before: int = 0
# self.shared_before: int = 0
self.__rss_used: int = 0
self.__vms_used: int = 0
# self.shared_used: int = 0
# time
self.__call_start: float = 0
self.__elapsed = 0
def __get_process_memory(self) -> Tuple[int, int]:
process = psutil.Process(os.getpid())
mi = process.memory_info()
# macos does not have mi.shared
return mi.rss, mi.vms
def __elapsed_since(self) -> str:
elapsed = time.time() - self.__call_start
if elapsed < 1:
return str(round(elapsed * 1000, 2)) + "ms"
if elapsed < 60:
return str(round(elapsed, 2)) + "s"
if elapsed < 3600:
return str(round(elapsed / 60, 2)) + "min"
return str(round(elapsed / 3600, 2)) + "hrs"
def __format_bytes(self, bytes_data: int) -> str:
if abs(bytes_data) < 1000:
return str(bytes_data) + "B"
if abs(bytes_data) < 1e6:
return str(round(bytes_data / 1e3, 2)) + "kB"
if abs(bytes_data) < 1e9:
return str(round(bytes_data / 1e6, 2)) + "MB"
return str(round(bytes_data / 1e9, 2)) + "GB"
def start_profiling(self, ident: str) -> None:
"""
start the profiling
"""
self.__ident = ident
self.__rss_before, self.__vms_before = self.__get_process_memory()
self.__call_start = time.time()
def end_profiling(self) -> None:
"""
end the profiling
"""
if self.__rss_before == 0 and self.__vms_before == 0:
print("start_profile() was not called, output will be negative")
self.__elapsed = self.__elapsed_since()
__rss_after, __vms_after = self.__get_process_memory()
self.__rss_used = __rss_after - self.__rss_before
self.__vms_used = __vms_after - self.__vms_before
def print_profiling(self) -> str:
"""
print the profiling time
"""
return (
f"Profiling: {self.__ident:>20} "
f"RSS: {self.__format_bytes(self.__rss_used):>8} | "
f"VMS: {self.__format_bytes(self.__vms_used):>8} | "
f"time: {self.__elapsed:>8}"
)
# __END__

114
src/CoreLibs/debug/timer.py Normal file
View File

@@ -0,0 +1,114 @@
"""
a interval time class
Returns:
Timer: class timer for basic time run calculations
"""
from datetime import datetime, timedelta
class Timer:
"""
get difference between start and end date/time
"""
def __init__(self):
"""
init new start time and set end time to None
"""
self._overall_start_time = datetime.now()
self._overall_end_time = None
self._overall_run_time = None
self._start_time = datetime.now()
self._end_time = None
self._run_time = None
# MARK: overall run time
def overall_run_time(self) -> timedelta:
"""
overall run time difference from class launch to call of this function
Returns:
timedelta: _description_
"""
self._overall_end_time = datetime.now()
self._overall_run_time = self._overall_end_time - self._overall_start_time
return self._overall_run_time
def get_overall_start_time(self) -> datetime:
"""
get set start time
Returns:
datetime: _description_
"""
return self._overall_start_time
def get_overall_end_time(self) -> datetime | None:
"""
get set end time or None for not set
Returns:
datetime|None: _description_
"""
return self._overall_end_time
def get_overall_run_time(self) -> timedelta | None:
"""
get run time or None if run time was not called
Returns:
datetime|None: _description_
"""
return self._overall_run_time
# MARK: set run time
def run_time(self) -> timedelta:
"""
difference between start time and current time
Returns:
datetime: _description_
"""
self._end_time = datetime.now()
self._run_time = self._end_time - self._start_time
return self._run_time
def reset_run_time(self):
"""
reset start/end and run tine
"""
self._start_time = datetime.now()
self._end_time = None
self._run_time = None
def get_start_time(self) -> datetime:
"""
get set start time
Returns:
datetime: _description_
"""
return self._start_time
def get_end_time(self) -> datetime | None:
"""
get set end time or None for not set
Returns:
datetime|None: _description_
"""
return self._end_time
def get_run_time(self) -> timedelta | None:
"""
get run time or None if run time was not called
Returns:
datetime|None: _description_
"""
return self._run_time
# __END__

View File

@@ -0,0 +1,75 @@
"""
Various small helpers for data writing
"""
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from io import TextIOWrapper
def write_l(line: str, fpl: 'TextIOWrapper | None' = None, print_line: bool = False):
"""
Write a line to screen and to output file
Args:
line (String): Line to write
fpl (Resource): file handler resource, if none write only to console
"""
if print_line is True:
print(line)
if fpl is not None:
fpl.write(line + "\n")
# progress printers
def pr_header(tag: str, marker_string: str = '#', width: int = 35):
"""_summary_
Args:
tag (str): _description_
"""
print(f" {marker_string} {tag:^{width}} {marker_string}")
def pr_title(tag: str, prefix_string: str = '|', space_filler: str = '.', width: int = 35):
"""_summary_
Args:
tag (str): _description_
prefix_string (str, optional): _description_. Defaults to '|'.
"""
print(f" {prefix_string} {tag:{space_filler}<{width}}:", flush=True)
def pr_open(tag: str, prefix_string: str = '|', space_filler: str = '.', width: int = 35):
"""
writen progress open line with tag
Args:
tag (str): _description_
prefix_string (str): prefix string. Default: '|'
"""
print(f" {prefix_string} {tag:{space_filler}<{width}} [", end="", flush=True)
def pr_close(tag: str = ''):
"""
write the close tag with new line
Args:
tag (str, optional): _description_. Defaults to ''.
"""
print(f"{tag}]", flush=True)
def pr_act(act: str = "."):
"""
write progress character
Args:
act (str, optional): _description_. Defaults to ".".
"""
print(f"{act}", end="", flush=True)
# __EMD__

View File

View File

@@ -0,0 +1,44 @@
"""
crc handlers for file CRC
"""
import zlib
from pathlib import Path
def file_crc(file_path: Path) -> str:
"""
With for loop and buffer, create file crc32
Args:
file_path (str | Path): _description_
Returns:
str: file crc32
"""
crc = 0
with open(file_path, 'rb', 65536) as ins:
for _ in range(int((file_path.stat().st_size / 65536)) + 1):
crc = zlib.crc32(ins.read(65536), crc)
return f"{crc & 0xFFFFFFFF:08X}"
def file_name_crc(file_path: Path, add_parent_folder: bool = False) -> str:
"""
either returns file name only from path
eg: /foo/bar/baz/file.csv will be file.csv
or
return the first parent path from path + file
eg: /foo/bar/baz/file.csv will be baz/file.csv
Args:
file_path (Path): _description_
add_parent_folder (bool, optional): _description_. Defaults to False.
Returns:
str: file name as string
"""
if add_parent_folder:
return str(Path(file_path.parent.name).joinpath(file_path.name))
else:
return file_path.name

View File

@@ -0,0 +1,46 @@
"""
File handling utilities
"""
import os
import shutil
from pathlib import Path
def remove_all_in_directory(directory: Path, ignore_files: list[str] | None = None, verbose: bool = False) -> bool:
"""
remove all files and folders in a directory
can exclude files or folders
Args:
directory (Path): _description_
ignore_files (list[str], optional): _description_. Defaults to None.
Returns:
bool: _description_
"""
if not directory.is_dir():
return False
if ignore_files is None:
ignore_files = []
if verbose:
print(f"Remove old files in: {directory.name} [", end="", flush=True)
# remove all files and folders in given directory by recursive globbing
for file in directory.rglob("*"):
# skip if in ignore files
if file.name in ignore_files:
continue
# remove one file, or a whole directory
if file.is_file():
os.remove(file)
if verbose:
print(".", end="", flush=True)
elif file.is_dir():
shutil.rmtree(file)
if verbose:
print("/", end="", flush=True)
if verbose:
print("]", flush=True)
return True
# __END__

View File

View File

@@ -0,0 +1,35 @@
"""
helper functions for jmespath interfaces
"""
from typing import Any
import jmespath
import jmespath.exceptions
def jmespath_search(search_data: dict[Any, Any] | list[Any], search_params: str) -> Any:
"""
jmespath search wrapper
Args:
search_data (dict | list): _description_
search_params (str): _description_
Raises:
ValueError: jmespath.exceptions.LexerError
ValueError: jmespath.exceptions.ParseError
Returns:
Any: dict/list/etc, None if nothing found
"""
try:
search_result = jmespath.search(search_params, search_data)
except jmespath.exceptions.LexerError as excp:
raise ValueError(f"Compile failed: {search_params}: {excp}") from excp
except jmespath.exceptions.ParseError as excp:
raise ValueError(f"Parse failed: {search_params}: {excp}") from excp
except TypeError as excp:
raise ValueError(f"Type error for search_params: {excp}") from excp
return search_result
# __END__

View File

@@ -0,0 +1,31 @@
"""
json encoder for datetime
"""
from typing import Any
from json import JSONEncoder
from datetime import datetime, date
# subclass JSONEncoder
class DateTimeEncoder(JSONEncoder):
"""
Override the default method
cls=DateTimeEncoder
"""
def default(self, o: Any) -> str | None:
if isinstance(o, (date, datetime)):
return o.isoformat()
return None
def default(obj: Any) -> str | None:
"""
default override
default=default
"""
if isinstance(obj, (date, datetime)):
return obj.isoformat()
return None
# __END__

View File

@@ -0,0 +1,128 @@
"""
wrapper around search path
"""
from typing import Any
def array_search(
search_params: list[dict[str, str | bool | list[str | None]]],
data: list[dict[str, Any]],
return_index: bool = False
) -> list[dict[str, Any]]:
"""
search in an array of dicts with an array of Key/Value set
all Key/Value sets must match
Value set can be list for OR match
option: case_senstive: default True
Args:
search_params (list): List of search params in "Key"/"Value" lists with options
data (list): data to search in, must be a list
return_index (bool): return index of list [default False]
Raises:
ValueError: if search params is not a list
KeyError: if Key or Value are missing in search params
KeyError: More than one Key with the same name set
Returns:
list: list of found elements, or if return index
list of dics with "index" and "data", where "data" holds the result list
"""
if not isinstance(search_params, list): # type: ignore
raise ValueError("search_params must be a list")
keys = []
for search in search_params:
if not search.get('Key') or not search.get('Value'):
raise KeyError(
f"Either Key '{search.get('Key', '')}' or "
f"Value '{search.get('Value', '')}' is missing or empty"
)
# if double key -> abort
if search.get("Key") in keys:
raise KeyError(
f"Key {search.get('Key', '')} already exists in search_params"
)
return_items: list[dict[str, Any]] = []
for si_idx, search_item in enumerate(data):
# for each search entry, all must match
matching = 0
for search in search_params:
# either Value direct or if Value is list then any of those items can match
# values are compared in lower case if case senstive is off
# lower case left side
# TODO: allow nested Keys. eg "Key: ["Key a", "key b"]" to be ["Key a"]["key b"]
if search.get("case_sensitive", True) is False:
search_value = search_item.get(str(search['Key']), "").lower()
else:
search_value = search_item.get(str(search['Key']), "")
# lower case right side
if isinstance(search['Value'], list):
search_in = [
str(k).lower()
if search.get("case_sensitive", True) is False else k
for k in search['Value']
]
elif search.get("case_sensitive", True) is False:
search_in = str(search['Value']).lower()
else:
search_in = search['Value']
# compare check
if (
(
isinstance(search_in, list) and
search_value in search_in
) or
search_value == search_in
):
matching += 1
if len(search_params) == matching:
if return_index is True:
# the data is now in "data sub set"
return_items.append({
"index": si_idx,
"data": search_item
})
else:
return_items.append(search_item)
# return all found or empty list
return return_items
def key_lookup(haystack: dict[str, str], key: str) -> str:
"""
simple key lookup in haystack, erturns empty string if not found
Args:
haystack (dict[str, str]): _description_
key (str): _description_
Returns:
str: _description_
"""
return haystack.get(key, "")
def value_lookup(haystack: dict[str, str], value: str, raise_on_many: bool = False) -> str:
"""
find by value, if not found returns empty, if not raise on many returns the first one
Args:
haystack (dict[str, str]): _description_
value (str): _description_
raise_on_many (bool, optional): _description_. Defaults to False.
Raises:
ValueError: _description_
Returns:
str: _description_
"""
keys = [__key for __key, __value in haystack.items() if __value == value]
if not keys:
return ""
if raise_on_many is True and len(keys) > 1:
raise ValueError("More than one element found with the same name")
return keys[0]

View File

@@ -0,0 +1,21 @@
"""
dict dump as JSON formatted
"""
import json
from typing import Any
def dump_data(data: dict[Any, Any] | list[Any] | str | None) -> str:
"""
dump formated output from dict/list
Args:
data (dict | list | str): _description_
Returns:
str: _description_
"""
return json.dumps(data, indent=4, ensure_ascii=False, default=str)
# __END__

View File

@@ -0,0 +1,37 @@
"""
Various dictionary, object and list hashers
"""
import json
import hashlib
from typing import Any
def dict_hash_frozen(data: dict[Any, Any]) -> int:
"""
hash a dict via freeze
Args:
data (dict): _description_
Returns:
str: _description_
"""
return hash(frozenset(data.items()))
def dict_hash_crc(data: dict[Any, Any] | list[Any]) -> str:
"""
Create a sha256 hash over dict
alternative for
dict_hash_frozen
Args:
data (dict | list): _description_
Returns:
str: _description_
"""
return hashlib.sha256(
json.dumps(data, sort_keys=True, ensure_ascii=True).encode('utf-8')
).hexdigest()

View File

@@ -0,0 +1,59 @@
"""
Various helper functions for type data clean up
"""
from typing import Any, cast
def delete_keys_from_set(
set_data: dict[str, Any] | list[Any] | Any, keys: list[str]
) -> dict[str, Any] | list[Any] | Any:
"""
remove all keys from set_data
Args:
set_data (dict[str, Any] | list[Any] | None): _description_
keys (list[str]): _description_
Returns:
dict[str, Any] | list[Any] | None: _description_
"""
# skip everything if there is no keys list
if not keys:
return set_data
if isinstance(set_data, dict):
for key, value in set_data.copy().items():
if key in keys:
del set_data[key]
if isinstance(value, (dict, list)):
delete_keys_from_set(value, keys)
elif isinstance(set_data, list):
for value in set_data:
if isinstance(value, (dict, list)):
delete_keys_from_set(value, keys)
return set_data
def build_dict(
any_dict: Any, ignore_entries: list[str] | None = None
) -> dict[str, Any | list[Any] | dict[Any, Any]]:
"""
rewrite any AWS *TypeDef to new dict so we can add/change entrys
Args:
any_dict (Any): _description_
Returns:
dict[str, Any | list[Any]]: _description_
"""
if ignore_entries is None:
return cast(dict[str, Any | list[Any] | dict[Any, Any]], any_dict)
# ignore entries can be one key or key nested
# return {
# key: value for key, value in any_dict.items() if key not in ignore_entries
# }
return cast(
dict[str, Any | list[Any] | dict[Any, Any]],
delete_keys_from_set(any_dict, ignore_entries)
)

View File

View File

@@ -0,0 +1,89 @@
"""
Collect error and warning messages as JSON blocks into an array
"""
from typing import Any
class ErrorMessage:
"""
Error and Warning collector
"""
# errors and warning
__error_list: list[dict[str, Any]] = []
__warning_list: list[dict[str, Any]] = []
def reset_warnings(self):
"""
reset warnings
"""
ErrorMessage.__warning_list = []
def add_warning(self, message: dict[str, Any], base_message: dict[str, Any] | None = None):
"""
add one warning
Args:
message (dict): _description_
base_message (dict, optional): _description_. Defaults to {}.
"""
if base_message is None or not isinstance(base_message, dict): # type: ignore
base_message = {}
base_message['level'] = "Warning"
ErrorMessage.__warning_list.append(base_message | message)
def get_warnings(self) -> list[dict[str, Any]]:
"""_summary_
Returns:
list: _description_
"""
return ErrorMessage.__warning_list
def has_warnings(self) -> bool:
"""
check if there ware warnings
Returns:
bool: _description_
"""
return bool(ErrorMessage.__warning_list)
def reset_errors(self):
"""
reset the error list
"""
ErrorMessage.__error_list = []
def add_error(self, message: dict[str, Any], base_messasge: dict[str, Any] | None = None):
"""
add one error
Args:
error (dict): _description_
base_error (dict, optional): _description_. Defaults to {}.
"""
if base_messasge is None or not isinstance(base_messasge, dict): # type: ignore
base_messasge = {}
base_messasge['level'] = "Error"
ErrorMessage.__error_list.append(base_messasge | message)
def get_errors(self) -> list[dict[str, Any]]:
"""_summary_
Returns:
list: _description_
"""
return ErrorMessage.__error_list
def has_errors(self) -> bool:
"""
check if there ware warnings
Returns:
bool: _description_
"""
return bool(ErrorMessage.__error_list)
# __END__

120
src/CoreLibs/logging/log.py Normal file
View File

@@ -0,0 +1,120 @@
"""
A log handler wrapper
"""
import logging.handlers
import logging
from pathlib import Path
from typing import Mapping
class Log:
"""
logger setup
"""
EXCEPTION: int = 60
def __init__(
self,
log_path: Path,
log_name: str,
log_level_console: str = 'WARNING',
log_level_file: str = 'DEBUG',
add_start_info: bool = True
):
logging.addLevelName(Log.EXCEPTION, 'EXCEPTION')
if not log_name.endswith('.log'):
log_path = log_path.with_suffix('.log')
# overall logger settings
self.logger = logging.getLogger(log_name)
# set maximum logging level for all logging output
self.logger.setLevel(logging.DEBUG)
# console logger
self.__console_handler(log_level_console)
# file logger
self.__file_handler(log_level_file, log_path)
# if requests set a start log
if add_start_info is True:
self.break_line('START')
def __filter_exceptions(self, record: logging.LogRecord) -> bool:
return record.levelname != "EXCEPTION"
def __console_handler(self, log_level_console: str = 'WARNING'):
# console logger
if not isinstance(getattr(logging, log_level_console.upper(), None), int):
log_level_console = 'WARNING'
console_handler = logging.StreamHandler()
formatter_console = logging.Formatter(
(
'[%(asctime)s.%(msecs)03d] '
'[%(filename)s:%(funcName)s:%(lineno)d] '
'<%(levelname)s> '
'%(message)s'
),
datefmt="%Y-%m-%d %H:%M:%S",
)
console_handler.setLevel(log_level_console)
# do not show exceptions logs on console
console_handler.addFilter(self.__filter_exceptions)
console_handler.setFormatter(formatter_console)
self.logger.addHandler(console_handler)
def __file_handler(self, log_level_file: str, log_path: Path) -> None:
# file logger
if not isinstance(getattr(logging, log_level_file.upper(), None), int):
log_level_file = 'DEBUG'
file_handler = logging.handlers.TimedRotatingFileHandler(
filename=log_path,
encoding="utf-8",
when="D",
interval=1
)
formatter_file_handler = logging.Formatter(
(
'[%(asctime)s.%(msecs)03d] '
'[%(pathname)s:%(funcName)s:%(lineno)d] '
'[%(name)s:%(process)d] '
'<%(levelname)s> '
'%(message)s'
),
datefmt="%Y-%m-%dT%H:%M:%S",
)
file_handler.setLevel(log_level_file)
file_handler.setFormatter(formatter_file_handler)
self.logger.addHandler(file_handler)
def break_line(self, info: str = "BREAK"):
"""
add a break line as info level
Keyword Arguments:
info {str} -- _description_ (default: {"BREAK"})
"""
self.logger.info("[%s] ================================>", info)
def exception(self, msg: object, *args: object, extra: Mapping[str, object] | None = None) -> None:
"""
log on exceotion level
Args:
msg (object): _description_
*args (object): arguments for msg
extra: Mapping[str, object] | None: extra arguments for the formatting if needed
"""
self.logger.log(Log.EXCEPTION, msg, *args, exc_info=True, extra=extra)
def validate_log_level(self, log_level: str) -> bool:
"""
if the log level is invalid, will erturn false
Args:
log_level (str): _description_
Returns:
bool: _description_
"""
return isinstance(getattr(logging, log_level.upper(), None), int)
# __END__

View File

View File

@@ -0,0 +1,190 @@
"""
requests lib interface
V2 call type
"""
from typing import Any
import warnings
import requests
# to hide the verfiy warnings because of the bad SSL settings from Netskope, Akamai, etc
warnings.filterwarnings('ignore', message='Unverified HTTPS request')
class Caller:
"""_summary_"""
def __init__(
self,
header: dict[str, str],
verify: bool = True,
timeout: int = 20,
proxy: dict[str, str] | None = None
):
self.headers = header
self.timeout: int = timeout
self.cafile = "/Library/Application Support/Netskope/STAgent/data/nscacert.pem"
self.verify = verify
self.proxy = proxy
def __timeout(self, timeout: int | None) -> int:
if timeout is not None:
return timeout
return self.timeout
def __call(
self,
action: str,
url: str,
data: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
timeout: int | None = None
) -> requests.Response | None:
"""
call wrapper, on error returns None
Args:
action (str): _description_
url (str): _description_
data (dict | None): _description_. Defaults to None.
params (dict | None): _description_. Defaults to None.
Returns:
requests.Response | None: _description_
"""
if data is None:
data = {}
try:
response = None
if action == "get":
response = requests.get(
url,
params=params,
headers=self.headers,
timeout=self.__timeout(timeout),
verify=self.verify,
proxies=self.proxy
)
elif action == "post":
response = requests.post(
url,
params=params,
json=data,
headers=self.headers,
timeout=self.__timeout(timeout),
verify=self.verify,
proxies=self.proxy
)
elif action == "put":
response = requests.put(
url,
params=params,
json=data,
headers=self.headers,
timeout=self.__timeout(timeout),
verify=self.verify,
proxies=self.proxy
)
elif action == "patch":
response = requests.patch(
url,
params=params,
json=data,
headers=self.headers,
timeout=self.__timeout(timeout),
verify=self.verify,
proxies=self.proxy
)
elif action == "delete":
response = requests.delete(
url,
params=params,
headers=self.headers,
timeout=self.__timeout(timeout),
verify=self.verify,
proxies=self.proxy
)
return response
except requests.exceptions.InvalidSchema as e:
print(f"Invalid URL during '{action}' for {url}:\n\t{e}")
return None
except requests.exceptions.ReadTimeout as e:
print(f"Timeout ({self.timeout}s) during '{action}' for {url}:\n\t{e}")
return None
except requests.exceptions.ConnectionError as e:
print(f"Connection error during '{action}' for {url}:\n\t{e}")
return None
def get(self, url: str, params: dict[str, Any] | None = None) -> requests.Response | None:
"""
get data
Args:
url (str): _description_
params (dict | None): _description_
Returns:
requests.Response: _description_
"""
return self.__call('get', url, params=params)
def post(
self, url: str, data: dict[str, Any] | None = None, params: dict[str, Any] | None = None
) -> requests.Response | None:
"""
post data
Args:
url (str): _description_
data (dict | None): _description_
params (dict | None): _description_
Returns:
requests.Response | None: _description_
"""
return self.__call('post', url, data, params)
def put(
self, url: str, data: dict[str, Any] | None = None, params: dict[str, Any] | None = None
) -> requests.Response | None:
"""_summary_
Args:
url (str): _description_
data (dict | None): _description_
params (dict | None): _description_
Returns:
requests.Response | None: _description_
"""
return self.__call('put', url, data, params)
def patch(
self, url: str, data: dict[str, Any] | None = None, params: dict[str, Any] | None = None
) -> requests.Response | None:
"""_summary_
Args:
url (str): _description_
data (dict | None): _description_
params (dict | None): _description_
Returns:
requests.Response | None: _description_
"""
return self.__call('patch', url, data, params)
def delete(self, url: str, params: dict[str, Any] | None = None) -> requests.Response | None:
"""
delete
Args:
url (str): _description_
params (dict | None): _description_
Returns:
requests.Response | None: _description_
"""
return self.__call('delete', url, params=params)
# __END__

View File

View File

@@ -0,0 +1,34 @@
"""
String helpers
"""
from textwrap import shorten
def shorten_string(string: str, length: int, hard_shorten: bool = False, placeholder: str = " [~]") -> str:
"""
check if entry is too long and cut it, but only for console output
Note that if there are no spaces in the string, it will automatically use the hard split mode
Args:
string (str): _description_
length (int): _description_
hard_shorten (bool): if shorte should be done on fixed string lenght. Default: False
placeholder (str): placeholder string. Default: " [~]"
Returns:
str: _description_
"""
length = int(length)
string = str(string)
if len(string) > length:
if hard_shorten is True or " " not in string:
short_string = f"{string[:(length - len(placeholder))]}{placeholder}"
else:
short_string = shorten(string, width=length, placeholder=placeholder)
else:
short_string = string
return short_string
# __END__

42
uv.lock generated Normal file
View File

@@ -0,0 +1,42 @@
version = 1
revision = 2
requires-python = ">=3.13"
[[package]]
name = "corelibs-python"
version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "jmespath" },
{ name = "psutil" },
]
[package.metadata]
requires-dist = [
{ name = "jmespath", specifier = ">=1.0.1" },
{ name = "psutil", specifier = ">=7.0.0" },
]
[[package]]
name = "jmespath"
version = "1.0.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/00/2a/e867e8531cf3e36b41201936b7fa7ba7b5702dbef42922193f05c8976cd6/jmespath-1.0.1.tar.gz", hash = "sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe", size = 25843, upload-time = "2022-06-17T18:00:12.224Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/31/b4/b9b800c45527aadd64d5b442f9b932b00648617eb5d63d2c7a6587b7cafc/jmespath-1.0.1-py3-none-any.whl", hash = "sha256:02e2e4cc71b5bcab88332eebf907519190dd9e6e82107fa7f83b1003a6252980", size = 20256, upload-time = "2022-06-17T18:00:10.251Z" },
]
[[package]]
name = "psutil"
version = "7.0.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/2a/80/336820c1ad9286a4ded7e845b2eccfcb27851ab8ac6abece774a6ff4d3de/psutil-7.0.0.tar.gz", hash = "sha256:7be9c3eba38beccb6495ea33afd982a44074b78f28c434a1f51cc07fd315c456", size = 497003, upload-time = "2025-02-13T21:54:07.946Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ed/e6/2d26234410f8b8abdbf891c9da62bee396583f713fb9f3325a4760875d22/psutil-7.0.0-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:101d71dc322e3cffd7cea0650b09b3d08b8e7c4109dd6809fe452dfd00e58b25", size = 238051, upload-time = "2025-02-13T21:54:12.36Z" },
{ url = "https://files.pythonhosted.org/packages/04/8b/30f930733afe425e3cbfc0e1468a30a18942350c1a8816acfade80c005c4/psutil-7.0.0-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:39db632f6bb862eeccf56660871433e111b6ea58f2caea825571951d4b6aa3da", size = 239535, upload-time = "2025-02-13T21:54:16.07Z" },
{ url = "https://files.pythonhosted.org/packages/2a/ed/d362e84620dd22876b55389248e522338ed1bf134a5edd3b8231d7207f6d/psutil-7.0.0-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1fcee592b4c6f146991ca55919ea3d1f8926497a713ed7faaf8225e174581e91", size = 275004, upload-time = "2025-02-13T21:54:18.662Z" },
{ url = "https://files.pythonhosted.org/packages/bf/b9/b0eb3f3cbcb734d930fdf839431606844a825b23eaf9a6ab371edac8162c/psutil-7.0.0-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4b1388a4f6875d7e2aff5c4ca1cc16c545ed41dd8bb596cefea80111db353a34", size = 277986, upload-time = "2025-02-13T21:54:21.811Z" },
{ url = "https://files.pythonhosted.org/packages/eb/a2/709e0fe2f093556c17fbafda93ac032257242cabcc7ff3369e2cb76a97aa/psutil-7.0.0-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a5f098451abc2828f7dc6b58d44b532b22f2088f4999a937557b603ce72b1993", size = 279544, upload-time = "2025-02-13T21:54:24.68Z" },
{ url = "https://files.pythonhosted.org/packages/50/e6/eecf58810b9d12e6427369784efe814a1eec0f492084ce8eb8f4d89d6d61/psutil-7.0.0-cp37-abi3-win32.whl", hash = "sha256:ba3fcef7523064a6c9da440fc4d6bd07da93ac726b5733c29027d7dc95b39d99", size = 241053, upload-time = "2025-02-13T21:54:34.31Z" },
{ url = "https://files.pythonhosted.org/packages/50/1b/6921afe68c74868b4c9fa424dad3be35b095e16687989ebbb50ce4fceb7c/psutil-7.0.0-cp37-abi3-win_amd64.whl", hash = "sha256:4cf3d4eb1aa9b348dec30105c55cd9b7d4629285735a102beb4441e38db90553", size = 244885, upload-time = "2025-02-13T21:54:37.486Z" },
]