commit e778e7a42f606c97229a447e945405a9f46a5d35 Author: Clemens Schwaighofer Date: Tue Jul 1 15:05:32 2025 +0900 CoreLibs for Python this is an intial test install. The folder and file names will differ and some things will move around. 1.0.0 will have the first usable setup diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ad202a0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.venv/ +**/__pycache__/* +**/*.egg-info +.mypy_cache/ +**/.env diff --git a/ReadMe.md b/ReadMe.md new file mode 100644 index 0000000..be75417 --- /dev/null +++ b/ReadMe.md @@ -0,0 +1,19 @@ +# CoreLibs for Python + +This is a pip package that can be installed into any project and covers the following pars + +- loggingn update with exception logs +- requests wrapper for easier auth pass on access +- dict fingerprinting +- jmespath search +- dump outputs for data +- progress printing + +## Python venv setup + +In the folder where the script will be located + +```sh +uv venv --python 3.13 +``` + diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..9b83a34 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,48 @@ +# MARK: Project info +[project] +name = "corelibs-python" +version = "0.1.0" +description = "Collection of utils for Python scripts" +readme = "ReadMe.md" +requires-python = ">=3.13" +dependencies = [ + "jmespath>=1.0.1", + "psutil>=7.0.0", +] +# set this to disable publish to pypi (pip) +# classifiers = ["Private :: Do Not Upload"] + +# MARK: build target +[[tool.uv.index]] +name = "egra-gitea" +url = "https://git.egplusww.jp/org/PyPI/dashboard" +publish-url = "https://git.egplusww.jp/api/packages/PyPI/pypi" +explicit = true + +# MARK: build system +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +# MARK: Python tools +[tool.pyright] +typeCheckingMode = "strict" +reportMissingImports = "information" +reportMissingTypeStubs = "information" +reportUnknownMemberType = "information" +[tool.ruff] +line-length = 120 +[tool.black] +# set 10 short for better formatting +line-length = 110 +# to avoid " ... " " ... " string sets +# experimental-string-processing = true +preview = true +enable-unstable-feature = ["string_processing"] +[tool.pylint.format] +max-line-length = 120 +[tool.pylint.miscellaneous] +notes = ["FIXME", "TODO"] +notes-rgx = '(FIXME|TODO)(\((TTD-|#)\[0-9]+\))' +[tool.flake8] +max-line-length = 120 diff --git a/src/CoreLibs/__init__.py b/src/CoreLibs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/CoreLibs/csv/__init__.py b/src/CoreLibs/csv/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/CoreLibs/csv/csv_writer.py b/src/CoreLibs/csv/csv_writer.py new file mode 100644 index 0000000..ff4a529 --- /dev/null +++ b/src/CoreLibs/csv/csv_writer.py @@ -0,0 +1,91 @@ +""" +Write to CSV file +- each class set is one file write with one header set +""" + +from typing import Any +from pathlib import Path +from collections import Counter +import csv + + +class CsvWriter: + """ + write to a CSV file + """ + + def __init__( + self, + path: Path, + file_name: str, + header: dict[str, str], + header_order: list[str] | None = None + ): + self.path = path + self.file_name = file_name + # Key: index for write for the line dict, Values: header entries + self.header = header + self.csv_file_writer = self.__open_csv(header_order) + + def __open_csv(self, header_order: list[str] | None) -> 'csv.DictWriter[str] | None': + """ + open csv file for writing, write headers + + Note that if there is no header_order set we use the order in header dictionary + + Arguments: + line {list[str] | None} -- optional dedicated header order + + Returns: + csv.DictWriter[str] | None: _description_ + """ + # if header order is set, make sure all header value fields exist + header_values = self.header.values() + if header_order is not None: + if Counter(header_values) != Counter(header_order): + print( + "header order does not match header values: " + f"{', '.join(header_values)} != {', '.join(header_order)}" + ) + return None + header_values = header_order + # no duplicates + if len(header_values) != len(set(header_values)): + print(f"Header must have unique values only: {', '.join(header_values)}") + return None + try: + fp = open( + self.path.joinpath(self.file_name), + "w", encoding="utf-8" + ) + csv_file_writer = csv.DictWriter( + fp, + fieldnames=header_values, + delimiter=",", + quotechar='"', + quoting=csv.QUOTE_MINIMAL, + ) + csv_file_writer.writeheader() + return csv_file_writer + except OSError as err: + print("OS error:", err) + return None + + def write_csv(self, line: dict[str, str]) -> bool: + """ + write member csv line + + Arguments: + line {dict[str, str]} -- _description_ + + Returns: + bool -- _description_ + """ + if self.csv_file_writer is None: + return False + csv_row: dict[str, Any] = {} + # only write entries that are in the header list + for key, value in self.header.items(): + csv_row[value] = line[key] + self.csv_file_writer.writerow(csv_row) + return True diff --git a/src/CoreLibs/debug/__init__.py b/src/CoreLibs/debug/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/CoreLibs/debug/profiling.py b/src/CoreLibs/debug/profiling.py new file mode 100644 index 0000000..75f80da --- /dev/null +++ b/src/CoreLibs/debug/profiling.py @@ -0,0 +1,126 @@ +""" +Profile memory usage in Python +""" + +# https://docs.python.org/3/library/tracemalloc.html + +import os +import time +import tracemalloc +import linecache +from typing import Tuple +from tracemalloc import Snapshot +import psutil + + +def display_top(snapshot: Snapshot, key_type: str = 'lineno', limit: int = 10) -> str: + """ + Print tracmalloc stats + https://docs.python.org/3/library/tracemalloc.html#pretty-top + + Args: + snapshot (Snapshot): _description_ + key_type (str, optional): _description_. Defaults to 'lineno'. + limit (int, optional): _description_. Defaults to 10. + """ + snapshot = snapshot.filter_traces(( + tracemalloc.Filter(False, ""), + tracemalloc.Filter(False, ""), + )) + top_stats = snapshot.statistics(key_type) + + profiler_msg = f"Top {limit} lines" + for index, stat in enumerate(top_stats[:limit], 1): + frame = stat.traceback[0] + # replace "/path/to/module/file.py" with "module/file.py" + filename = os.sep.join(frame.filename.split(os.sep)[-2:]) + profiler_msg += f"#{index}: {filename}:{frame.lineno}: {(stat.size / 1024):.1f} KiB" + line = linecache.getline(frame.filename, frame.lineno).strip() + if line: + profiler_msg += f" {line}" + + other = top_stats[limit:] + if other: + size = sum(stat.size for stat in other) + profiler_msg += f"{len(other)} other: {(size / 1024):.1f} KiB" + total = sum(stat.size for stat in top_stats) + profiler_msg += f"Total allocated size: {(total / 1024):.1f} KiB" + return profiler_msg + + +class Profiling: + """ + Profile memory usage and elapsed time for some block + Based on: https://stackoverflow.com/a/53301648 + """ + + def __init__(self): + # profiling id + self.__ident: str = '' + # memory + self.__rss_before: int = 0 + self.__vms_before: int = 0 + # self.shared_before: int = 0 + self.__rss_used: int = 0 + self.__vms_used: int = 0 + # self.shared_used: int = 0 + # time + self.__call_start: float = 0 + self.__elapsed = 0 + + def __get_process_memory(self) -> Tuple[int, int]: + process = psutil.Process(os.getpid()) + mi = process.memory_info() + # macos does not have mi.shared + return mi.rss, mi.vms + + def __elapsed_since(self) -> str: + elapsed = time.time() - self.__call_start + if elapsed < 1: + return str(round(elapsed * 1000, 2)) + "ms" + if elapsed < 60: + return str(round(elapsed, 2)) + "s" + if elapsed < 3600: + return str(round(elapsed / 60, 2)) + "min" + return str(round(elapsed / 3600, 2)) + "hrs" + + def __format_bytes(self, bytes_data: int) -> str: + if abs(bytes_data) < 1000: + return str(bytes_data) + "B" + if abs(bytes_data) < 1e6: + return str(round(bytes_data / 1e3, 2)) + "kB" + if abs(bytes_data) < 1e9: + return str(round(bytes_data / 1e6, 2)) + "MB" + return str(round(bytes_data / 1e9, 2)) + "GB" + + def start_profiling(self, ident: str) -> None: + """ + start the profiling + """ + self.__ident = ident + self.__rss_before, self.__vms_before = self.__get_process_memory() + self.__call_start = time.time() + + def end_profiling(self) -> None: + """ + end the profiling + """ + if self.__rss_before == 0 and self.__vms_before == 0: + print("start_profile() was not called, output will be negative") + self.__elapsed = self.__elapsed_since() + __rss_after, __vms_after = self.__get_process_memory() + self.__rss_used = __rss_after - self.__rss_before + self.__vms_used = __vms_after - self.__vms_before + + def print_profiling(self) -> str: + """ + print the profiling time + """ + return ( + f"Profiling: {self.__ident:>20} " + f"RSS: {self.__format_bytes(self.__rss_used):>8} | " + f"VMS: {self.__format_bytes(self.__vms_used):>8} | " + f"time: {self.__elapsed:>8}" + ) + +# __END__ diff --git a/src/CoreLibs/debug/timer.py b/src/CoreLibs/debug/timer.py new file mode 100644 index 0000000..a2c5d26 --- /dev/null +++ b/src/CoreLibs/debug/timer.py @@ -0,0 +1,114 @@ +""" +a interval time class + +Returns: + Timer: class timer for basic time run calculations +""" + +from datetime import datetime, timedelta + + +class Timer: + """ + get difference between start and end date/time + """ + + def __init__(self): + """ + init new start time and set end time to None + """ + self._overall_start_time = datetime.now() + self._overall_end_time = None + self._overall_run_time = None + self._start_time = datetime.now() + self._end_time = None + self._run_time = None + + # MARK: overall run time + def overall_run_time(self) -> timedelta: + """ + overall run time difference from class launch to call of this function + + Returns: + timedelta: _description_ + """ + self._overall_end_time = datetime.now() + self._overall_run_time = self._overall_end_time - self._overall_start_time + return self._overall_run_time + + def get_overall_start_time(self) -> datetime: + """ + get set start time + + Returns: + datetime: _description_ + """ + return self._overall_start_time + + def get_overall_end_time(self) -> datetime | None: + """ + get set end time or None for not set + + Returns: + datetime|None: _description_ + """ + return self._overall_end_time + + def get_overall_run_time(self) -> timedelta | None: + """ + get run time or None if run time was not called + + Returns: + datetime|None: _description_ + """ + return self._overall_run_time + + # MARK: set run time + def run_time(self) -> timedelta: + """ + difference between start time and current time + + Returns: + datetime: _description_ + """ + self._end_time = datetime.now() + self._run_time = self._end_time - self._start_time + return self._run_time + + def reset_run_time(self): + """ + reset start/end and run tine + """ + self._start_time = datetime.now() + self._end_time = None + self._run_time = None + + def get_start_time(self) -> datetime: + """ + get set start time + + Returns: + datetime: _description_ + """ + return self._start_time + + def get_end_time(self) -> datetime | None: + """ + get set end time or None for not set + + Returns: + datetime|None: _description_ + """ + return self._end_time + + def get_run_time(self) -> timedelta | None: + """ + get run time or None if run time was not called + + Returns: + datetime|None: _description_ + """ + return self._run_time + + +# __END__ diff --git a/src/CoreLibs/debug/writeline.py b/src/CoreLibs/debug/writeline.py new file mode 100644 index 0000000..cb2c6e2 --- /dev/null +++ b/src/CoreLibs/debug/writeline.py @@ -0,0 +1,75 @@ +""" +Various small helpers for data writing +""" + +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from io import TextIOWrapper + + +def write_l(line: str, fpl: 'TextIOWrapper | None' = None, print_line: bool = False): + """ + Write a line to screen and to output file + + Args: + line (String): Line to write + fpl (Resource): file handler resource, if none write only to console + """ + if print_line is True: + print(line) + if fpl is not None: + fpl.write(line + "\n") + + +# progress printers + +def pr_header(tag: str, marker_string: str = '#', width: int = 35): + """_summary_ + + Args: + tag (str): _description_ + """ + print(f" {marker_string} {tag:^{width}} {marker_string}") + + +def pr_title(tag: str, prefix_string: str = '|', space_filler: str = '.', width: int = 35): + """_summary_ + + Args: + tag (str): _description_ + prefix_string (str, optional): _description_. Defaults to '|'. + """ + print(f" {prefix_string} {tag:{space_filler}<{width}}:", flush=True) + + +def pr_open(tag: str, prefix_string: str = '|', space_filler: str = '.', width: int = 35): + """ + writen progress open line with tag + + Args: + tag (str): _description_ + prefix_string (str): prefix string. Default: '|' + """ + print(f" {prefix_string} {tag:{space_filler}<{width}} [", end="", flush=True) + + +def pr_close(tag: str = ''): + """ + write the close tag with new line + + Args: + tag (str, optional): _description_. Defaults to ''. + """ + print(f"{tag}]", flush=True) + + +def pr_act(act: str = "."): + """ + write progress character + + Args: + act (str, optional): _description_. Defaults to ".". + """ + print(f"{act}", end="", flush=True) + +# __EMD__ diff --git a/src/CoreLibs/fileHandling/__init__.py b/src/CoreLibs/fileHandling/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/CoreLibs/fileHandling/file_crc.py b/src/CoreLibs/fileHandling/file_crc.py new file mode 100644 index 0000000..3bc2fa6 --- /dev/null +++ b/src/CoreLibs/fileHandling/file_crc.py @@ -0,0 +1,44 @@ +""" +crc handlers for file CRC +""" + +import zlib +from pathlib import Path + + +def file_crc(file_path: Path) -> str: + """ + With for loop and buffer, create file crc32 + + Args: + file_path (str | Path): _description_ + + Returns: + str: file crc32 + """ + crc = 0 + with open(file_path, 'rb', 65536) as ins: + for _ in range(int((file_path.stat().st_size / 65536)) + 1): + crc = zlib.crc32(ins.read(65536), crc) + return f"{crc & 0xFFFFFFFF:08X}" + + +def file_name_crc(file_path: Path, add_parent_folder: bool = False) -> str: + """ + either returns file name only from path + eg: /foo/bar/baz/file.csv will be file.csv + or + return the first parent path from path + file + eg: /foo/bar/baz/file.csv will be baz/file.csv + + Args: + file_path (Path): _description_ + add_parent_folder (bool, optional): _description_. Defaults to False. + + Returns: + str: file name as string + """ + if add_parent_folder: + return str(Path(file_path.parent.name).joinpath(file_path.name)) + else: + return file_path.name diff --git a/src/CoreLibs/fileHandling/file_handling.py b/src/CoreLibs/fileHandling/file_handling.py new file mode 100644 index 0000000..0672cfe --- /dev/null +++ b/src/CoreLibs/fileHandling/file_handling.py @@ -0,0 +1,46 @@ +""" +File handling utilities +""" + +import os +import shutil +from pathlib import Path + + +def remove_all_in_directory(directory: Path, ignore_files: list[str] | None = None, verbose: bool = False) -> bool: + """ + remove all files and folders in a directory + can exclude files or folders + + Args: + directory (Path): _description_ + ignore_files (list[str], optional): _description_. Defaults to None. + + Returns: + bool: _description_ + """ + if not directory.is_dir(): + return False + if ignore_files is None: + ignore_files = [] + if verbose: + print(f"Remove old files in: {directory.name} [", end="", flush=True) + # remove all files and folders in given directory by recursive globbing + for file in directory.rglob("*"): + # skip if in ignore files + if file.name in ignore_files: + continue + # remove one file, or a whole directory + if file.is_file(): + os.remove(file) + if verbose: + print(".", end="", flush=True) + elif file.is_dir(): + shutil.rmtree(file) + if verbose: + print("/", end="", flush=True) + if verbose: + print("]", flush=True) + return True + +# __END__ diff --git a/src/CoreLibs/json/__init__.py b/src/CoreLibs/json/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/CoreLibs/json/jmespath_helper.py b/src/CoreLibs/json/jmespath_helper.py new file mode 100644 index 0000000..2bb9e47 --- /dev/null +++ b/src/CoreLibs/json/jmespath_helper.py @@ -0,0 +1,35 @@ +""" +helper functions for jmespath interfaces +""" + +from typing import Any +import jmespath +import jmespath.exceptions + + +def jmespath_search(search_data: dict[Any, Any] | list[Any], search_params: str) -> Any: + """ + jmespath search wrapper + + Args: + search_data (dict | list): _description_ + search_params (str): _description_ + + Raises: + ValueError: jmespath.exceptions.LexerError + ValueError: jmespath.exceptions.ParseError + + Returns: + Any: dict/list/etc, None if nothing found + """ + try: + search_result = jmespath.search(search_params, search_data) + except jmespath.exceptions.LexerError as excp: + raise ValueError(f"Compile failed: {search_params}: {excp}") from excp + except jmespath.exceptions.ParseError as excp: + raise ValueError(f"Parse failed: {search_params}: {excp}") from excp + except TypeError as excp: + raise ValueError(f"Type error for search_params: {excp}") from excp + return search_result + +# __END__ diff --git a/src/CoreLibs/json/json_helper.py b/src/CoreLibs/json/json_helper.py new file mode 100644 index 0000000..4db2ff0 --- /dev/null +++ b/src/CoreLibs/json/json_helper.py @@ -0,0 +1,31 @@ +""" +json encoder for datetime +""" + +from typing import Any +from json import JSONEncoder +from datetime import datetime, date + + +# subclass JSONEncoder +class DateTimeEncoder(JSONEncoder): + """ + Override the default method + cls=DateTimeEncoder + """ + def default(self, o: Any) -> str | None: + if isinstance(o, (date, datetime)): + return o.isoformat() + return None + + +def default(obj: Any) -> str | None: + """ + default override + default=default + """ + if isinstance(obj, (date, datetime)): + return obj.isoformat() + return None + +# __END__ diff --git a/src/CoreLibs/listDictHandling/__init__.py b/src/CoreLibs/listDictHandling/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/CoreLibs/listDictHandling/data_search.py b/src/CoreLibs/listDictHandling/data_search.py new file mode 100644 index 0000000..d3b96b7 --- /dev/null +++ b/src/CoreLibs/listDictHandling/data_search.py @@ -0,0 +1,128 @@ +""" +wrapper around search path +""" + +from typing import Any + + +def array_search( + search_params: list[dict[str, str | bool | list[str | None]]], + data: list[dict[str, Any]], + return_index: bool = False +) -> list[dict[str, Any]]: + """ + search in an array of dicts with an array of Key/Value set + all Key/Value sets must match + Value set can be list for OR match + option: case_senstive: default True + + Args: + search_params (list): List of search params in "Key"/"Value" lists with options + data (list): data to search in, must be a list + return_index (bool): return index of list [default False] + + Raises: + ValueError: if search params is not a list + KeyError: if Key or Value are missing in search params + KeyError: More than one Key with the same name set + + Returns: + list: list of found elements, or if return index + list of dics with "index" and "data", where "data" holds the result list + """ + if not isinstance(search_params, list): # type: ignore + raise ValueError("search_params must be a list") + keys = [] + for search in search_params: + if not search.get('Key') or not search.get('Value'): + raise KeyError( + f"Either Key '{search.get('Key', '')}' or " + f"Value '{search.get('Value', '')}' is missing or empty" + ) + # if double key -> abort + if search.get("Key") in keys: + raise KeyError( + f"Key {search.get('Key', '')} already exists in search_params" + ) + + return_items: list[dict[str, Any]] = [] + for si_idx, search_item in enumerate(data): + # for each search entry, all must match + matching = 0 + for search in search_params: + # either Value direct or if Value is list then any of those items can match + # values are compared in lower case if case senstive is off + # lower case left side + # TODO: allow nested Keys. eg "Key: ["Key a", "key b"]" to be ["Key a"]["key b"] + if search.get("case_sensitive", True) is False: + search_value = search_item.get(str(search['Key']), "").lower() + else: + search_value = search_item.get(str(search['Key']), "") + # lower case right side + if isinstance(search['Value'], list): + search_in = [ + str(k).lower() + if search.get("case_sensitive", True) is False else k + for k in search['Value'] + ] + elif search.get("case_sensitive", True) is False: + search_in = str(search['Value']).lower() + else: + search_in = search['Value'] + # compare check + if ( + ( + isinstance(search_in, list) and + search_value in search_in + ) or + search_value == search_in + ): + matching += 1 + if len(search_params) == matching: + if return_index is True: + # the data is now in "data sub set" + return_items.append({ + "index": si_idx, + "data": search_item + }) + else: + return_items.append(search_item) + # return all found or empty list + return return_items + + +def key_lookup(haystack: dict[str, str], key: str) -> str: + """ + simple key lookup in haystack, erturns empty string if not found + + Args: + haystack (dict[str, str]): _description_ + key (str): _description_ + + Returns: + str: _description_ + """ + return haystack.get(key, "") + + +def value_lookup(haystack: dict[str, str], value: str, raise_on_many: bool = False) -> str: + """ + find by value, if not found returns empty, if not raise on many returns the first one + + Args: + haystack (dict[str, str]): _description_ + value (str): _description_ + raise_on_many (bool, optional): _description_. Defaults to False. + + Raises: + ValueError: _description_ + + Returns: + str: _description_ + """ + keys = [__key for __key, __value in haystack.items() if __value == value] + if not keys: + return "" + if raise_on_many is True and len(keys) > 1: + raise ValueError("More than one element found with the same name") + return keys[0] diff --git a/src/CoreLibs/listDictHandling/dump_data.py b/src/CoreLibs/listDictHandling/dump_data.py new file mode 100644 index 0000000..95048b2 --- /dev/null +++ b/src/CoreLibs/listDictHandling/dump_data.py @@ -0,0 +1,21 @@ +""" +dict dump as JSON formatted +""" + +import json +from typing import Any + + +def dump_data(data: dict[Any, Any] | list[Any] | str | None) -> str: + """ + dump formated output from dict/list + + Args: + data (dict | list | str): _description_ + + Returns: + str: _description_ + """ + return json.dumps(data, indent=4, ensure_ascii=False, default=str) + +# __END__ diff --git a/src/CoreLibs/listDictHandling/fingerprinting.py b/src/CoreLibs/listDictHandling/fingerprinting.py new file mode 100644 index 0000000..3bcb092 --- /dev/null +++ b/src/CoreLibs/listDictHandling/fingerprinting.py @@ -0,0 +1,37 @@ +""" +Various dictionary, object and list hashers +""" + +import json +import hashlib +from typing import Any + + +def dict_hash_frozen(data: dict[Any, Any]) -> int: + """ + hash a dict via freeze + + Args: + data (dict): _description_ + + Returns: + str: _description_ + """ + return hash(frozenset(data.items())) + + +def dict_hash_crc(data: dict[Any, Any] | list[Any]) -> str: + """ + Create a sha256 hash over dict + alternative for + dict_hash_frozen + + Args: + data (dict | list): _description_ + + Returns: + str: _description_ + """ + return hashlib.sha256( + json.dumps(data, sort_keys=True, ensure_ascii=True).encode('utf-8') + ).hexdigest() diff --git a/src/CoreLibs/listDictHandling/manage_dict.py b/src/CoreLibs/listDictHandling/manage_dict.py new file mode 100644 index 0000000..446eb9b --- /dev/null +++ b/src/CoreLibs/listDictHandling/manage_dict.py @@ -0,0 +1,59 @@ +""" +Various helper functions for type data clean up +""" + +from typing import Any, cast + + +def delete_keys_from_set( + set_data: dict[str, Any] | list[Any] | Any, keys: list[str] +) -> dict[str, Any] | list[Any] | Any: + """ + remove all keys from set_data + + Args: + set_data (dict[str, Any] | list[Any] | None): _description_ + keys (list[str]): _description_ + + Returns: + dict[str, Any] | list[Any] | None: _description_ + """ + # skip everything if there is no keys list + if not keys: + return set_data + if isinstance(set_data, dict): + for key, value in set_data.copy().items(): + if key in keys: + del set_data[key] + if isinstance(value, (dict, list)): + delete_keys_from_set(value, keys) + elif isinstance(set_data, list): + for value in set_data: + if isinstance(value, (dict, list)): + delete_keys_from_set(value, keys) + + return set_data + + +def build_dict( + any_dict: Any, ignore_entries: list[str] | None = None +) -> dict[str, Any | list[Any] | dict[Any, Any]]: + """ + rewrite any AWS *TypeDef to new dict so we can add/change entrys + + Args: + any_dict (Any): _description_ + + Returns: + dict[str, Any | list[Any]]: _description_ + """ + if ignore_entries is None: + return cast(dict[str, Any | list[Any] | dict[Any, Any]], any_dict) + # ignore entries can be one key or key nested + # return { + # key: value for key, value in any_dict.items() if key not in ignore_entries + # } + return cast( + dict[str, Any | list[Any] | dict[Any, Any]], + delete_keys_from_set(any_dict, ignore_entries) + ) diff --git a/src/CoreLibs/logging/__init__.py b/src/CoreLibs/logging/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/CoreLibs/logging/error_handling.py b/src/CoreLibs/logging/error_handling.py new file mode 100644 index 0000000..db29fea --- /dev/null +++ b/src/CoreLibs/logging/error_handling.py @@ -0,0 +1,89 @@ +""" +Collect error and warning messages as JSON blocks into an array +""" + +from typing import Any + + +class ErrorMessage: + """ + Error and Warning collector + """ + + # errors and warning + __error_list: list[dict[str, Any]] = [] + __warning_list: list[dict[str, Any]] = [] + + def reset_warnings(self): + """ + reset warnings + """ + ErrorMessage.__warning_list = [] + + def add_warning(self, message: dict[str, Any], base_message: dict[str, Any] | None = None): + """ + add one warning + + Args: + message (dict): _description_ + base_message (dict, optional): _description_. Defaults to {}. + """ + if base_message is None or not isinstance(base_message, dict): # type: ignore + base_message = {} + base_message['level'] = "Warning" + ErrorMessage.__warning_list.append(base_message | message) + + def get_warnings(self) -> list[dict[str, Any]]: + """_summary_ + + Returns: + list: _description_ + """ + return ErrorMessage.__warning_list + + def has_warnings(self) -> bool: + """ + check if there ware warnings + + Returns: + bool: _description_ + """ + return bool(ErrorMessage.__warning_list) + + def reset_errors(self): + """ + reset the error list + """ + ErrorMessage.__error_list = [] + + def add_error(self, message: dict[str, Any], base_messasge: dict[str, Any] | None = None): + """ + add one error + + Args: + error (dict): _description_ + base_error (dict, optional): _description_. Defaults to {}. + """ + if base_messasge is None or not isinstance(base_messasge, dict): # type: ignore + base_messasge = {} + base_messasge['level'] = "Error" + ErrorMessage.__error_list.append(base_messasge | message) + + def get_errors(self) -> list[dict[str, Any]]: + """_summary_ + + Returns: + list: _description_ + """ + return ErrorMessage.__error_list + + def has_errors(self) -> bool: + """ + check if there ware warnings + + Returns: + bool: _description_ + """ + return bool(ErrorMessage.__error_list) + +# __END__ diff --git a/src/CoreLibs/logging/log.py b/src/CoreLibs/logging/log.py new file mode 100644 index 0000000..1c9d642 --- /dev/null +++ b/src/CoreLibs/logging/log.py @@ -0,0 +1,120 @@ +""" +A log handler wrapper +""" + +import logging.handlers +import logging +from pathlib import Path +from typing import Mapping + + +class Log: + """ + logger setup + """ + + EXCEPTION: int = 60 + + def __init__( + self, + log_path: Path, + log_name: str, + log_level_console: str = 'WARNING', + log_level_file: str = 'DEBUG', + add_start_info: bool = True + ): + logging.addLevelName(Log.EXCEPTION, 'EXCEPTION') + if not log_name.endswith('.log'): + log_path = log_path.with_suffix('.log') + # overall logger settings + self.logger = logging.getLogger(log_name) + # set maximum logging level for all logging output + self.logger.setLevel(logging.DEBUG) + # console logger + self.__console_handler(log_level_console) + # file logger + self.__file_handler(log_level_file, log_path) + # if requests set a start log + if add_start_info is True: + self.break_line('START') + + def __filter_exceptions(self, record: logging.LogRecord) -> bool: + return record.levelname != "EXCEPTION" + + def __console_handler(self, log_level_console: str = 'WARNING'): + # console logger + if not isinstance(getattr(logging, log_level_console.upper(), None), int): + log_level_console = 'WARNING' + console_handler = logging.StreamHandler() + formatter_console = logging.Formatter( + ( + '[%(asctime)s.%(msecs)03d] ' + '[%(filename)s:%(funcName)s:%(lineno)d] ' + '<%(levelname)s> ' + '%(message)s' + ), + datefmt="%Y-%m-%d %H:%M:%S", + ) + console_handler.setLevel(log_level_console) + # do not show exceptions logs on console + console_handler.addFilter(self.__filter_exceptions) + console_handler.setFormatter(formatter_console) + self.logger.addHandler(console_handler) + + def __file_handler(self, log_level_file: str, log_path: Path) -> None: + # file logger + if not isinstance(getattr(logging, log_level_file.upper(), None), int): + log_level_file = 'DEBUG' + file_handler = logging.handlers.TimedRotatingFileHandler( + filename=log_path, + encoding="utf-8", + when="D", + interval=1 + ) + formatter_file_handler = logging.Formatter( + ( + '[%(asctime)s.%(msecs)03d] ' + '[%(pathname)s:%(funcName)s:%(lineno)d] ' + '[%(name)s:%(process)d] ' + '<%(levelname)s> ' + '%(message)s' + ), + datefmt="%Y-%m-%dT%H:%M:%S", + ) + file_handler.setLevel(log_level_file) + file_handler.setFormatter(formatter_file_handler) + self.logger.addHandler(file_handler) + + def break_line(self, info: str = "BREAK"): + """ + add a break line as info level + + Keyword Arguments: + info {str} -- _description_ (default: {"BREAK"}) + """ + self.logger.info("[%s] ================================>", info) + + def exception(self, msg: object, *args: object, extra: Mapping[str, object] | None = None) -> None: + """ + log on exceotion level + + Args: + msg (object): _description_ + *args (object): arguments for msg + extra: Mapping[str, object] | None: extra arguments for the formatting if needed + """ + self.logger.log(Log.EXCEPTION, msg, *args, exc_info=True, extra=extra) + + def validate_log_level(self, log_level: str) -> bool: + """ + if the log level is invalid, will erturn false + + Args: + log_level (str): _description_ + + Returns: + bool: _description_ + """ + return isinstance(getattr(logging, log_level.upper(), None), int) + +# __END__ diff --git a/src/CoreLibs/requests/__init__.py b/src/CoreLibs/requests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/CoreLibs/requests/caller.py b/src/CoreLibs/requests/caller.py new file mode 100644 index 0000000..e2261b4 --- /dev/null +++ b/src/CoreLibs/requests/caller.py @@ -0,0 +1,190 @@ +""" +requests lib interface +V2 call type +""" + +from typing import Any +import warnings +import requests +# to hide the verfiy warnings because of the bad SSL settings from Netskope, Akamai, etc +warnings.filterwarnings('ignore', message='Unverified HTTPS request') + + +class Caller: + """_summary_""" + + def __init__( + self, + header: dict[str, str], + verify: bool = True, + timeout: int = 20, + proxy: dict[str, str] | None = None + ): + self.headers = header + self.timeout: int = timeout + self.cafile = "/Library/Application Support/Netskope/STAgent/data/nscacert.pem" + self.verify = verify + self.proxy = proxy + + def __timeout(self, timeout: int | None) -> int: + if timeout is not None: + return timeout + return self.timeout + + def __call( + self, + action: str, + url: str, + data: dict[str, Any] | None = None, + params: dict[str, Any] | None = None, + timeout: int | None = None + ) -> requests.Response | None: + """ + call wrapper, on error returns None + + Args: + action (str): _description_ + url (str): _description_ + data (dict | None): _description_. Defaults to None. + params (dict | None): _description_. Defaults to None. + + Returns: + requests.Response | None: _description_ + """ + + if data is None: + data = {} + try: + response = None + if action == "get": + response = requests.get( + url, + params=params, + headers=self.headers, + timeout=self.__timeout(timeout), + verify=self.verify, + proxies=self.proxy + ) + elif action == "post": + response = requests.post( + url, + params=params, + json=data, + headers=self.headers, + timeout=self.__timeout(timeout), + verify=self.verify, + proxies=self.proxy + ) + elif action == "put": + response = requests.put( + url, + params=params, + json=data, + headers=self.headers, + timeout=self.__timeout(timeout), + verify=self.verify, + proxies=self.proxy + ) + elif action == "patch": + response = requests.patch( + url, + params=params, + json=data, + headers=self.headers, + timeout=self.__timeout(timeout), + verify=self.verify, + proxies=self.proxy + ) + elif action == "delete": + response = requests.delete( + url, + params=params, + headers=self.headers, + timeout=self.__timeout(timeout), + verify=self.verify, + proxies=self.proxy + ) + return response + except requests.exceptions.InvalidSchema as e: + print(f"Invalid URL during '{action}' for {url}:\n\t{e}") + return None + except requests.exceptions.ReadTimeout as e: + print(f"Timeout ({self.timeout}s) during '{action}' for {url}:\n\t{e}") + return None + except requests.exceptions.ConnectionError as e: + print(f"Connection error during '{action}' for {url}:\n\t{e}") + return None + + def get(self, url: str, params: dict[str, Any] | None = None) -> requests.Response | None: + """ + get data + + Args: + url (str): _description_ + params (dict | None): _description_ + + Returns: + requests.Response: _description_ + """ + return self.__call('get', url, params=params) + + def post( + self, url: str, data: dict[str, Any] | None = None, params: dict[str, Any] | None = None + ) -> requests.Response | None: + """ + post data + + Args: + url (str): _description_ + data (dict | None): _description_ + params (dict | None): _description_ + + Returns: + requests.Response | None: _description_ + """ + return self.__call('post', url, data, params) + + def put( + self, url: str, data: dict[str, Any] | None = None, params: dict[str, Any] | None = None + ) -> requests.Response | None: + """_summary_ + + Args: + url (str): _description_ + data (dict | None): _description_ + params (dict | None): _description_ + + Returns: + requests.Response | None: _description_ + """ + return self.__call('put', url, data, params) + + def patch( + self, url: str, data: dict[str, Any] | None = None, params: dict[str, Any] | None = None + ) -> requests.Response | None: + """_summary_ + + Args: + url (str): _description_ + data (dict | None): _description_ + params (dict | None): _description_ + + Returns: + requests.Response | None: _description_ + """ + return self.__call('patch', url, data, params) + + def delete(self, url: str, params: dict[str, Any] | None = None) -> requests.Response | None: + """ + delete + + Args: + url (str): _description_ + params (dict | None): _description_ + + Returns: + requests.Response | None: _description_ + """ + return self.__call('delete', url, params=params) + +# __END__ diff --git a/src/CoreLibs/stringHandling/__init__.py b/src/CoreLibs/stringHandling/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/CoreLibs/stringHandling/string_helpers.py b/src/CoreLibs/stringHandling/string_helpers.py new file mode 100644 index 0000000..29964c1 --- /dev/null +++ b/src/CoreLibs/stringHandling/string_helpers.py @@ -0,0 +1,34 @@ +""" +String helpers +""" + +from textwrap import shorten + + +def shorten_string(string: str, length: int, hard_shorten: bool = False, placeholder: str = " [~]") -> str: + """ + check if entry is too long and cut it, but only for console output + Note that if there are no spaces in the string, it will automatically use the hard split mode + + Args: + string (str): _description_ + length (int): _description_ + hard_shorten (bool): if shorte should be done on fixed string lenght. Default: False + placeholder (str): placeholder string. Default: " [~]" + + Returns: + str: _description_ + """ + length = int(length) + string = str(string) + if len(string) > length: + if hard_shorten is True or " " not in string: + short_string = f"{string[:(length - len(placeholder))]}{placeholder}" + else: + short_string = shorten(string, width=length, placeholder=placeholder) + else: + short_string = string + + return short_string + +# __END__ diff --git a/uv.lock b/uv.lock new file mode 100644 index 0000000..2b88f9e --- /dev/null +++ b/uv.lock @@ -0,0 +1,42 @@ +version = 1 +revision = 2 +requires-python = ">=3.13" + +[[package]] +name = "corelibs-python" +version = "0.1.0" +source = { virtual = "." } +dependencies = [ + { name = "jmespath" }, + { name = "psutil" }, +] + +[package.metadata] +requires-dist = [ + { name = "jmespath", specifier = ">=1.0.1" }, + { name = "psutil", specifier = ">=7.0.0" }, +] + +[[package]] +name = "jmespath" +version = "1.0.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/00/2a/e867e8531cf3e36b41201936b7fa7ba7b5702dbef42922193f05c8976cd6/jmespath-1.0.1.tar.gz", hash = "sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe", size = 25843, upload-time = "2022-06-17T18:00:12.224Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/31/b4/b9b800c45527aadd64d5b442f9b932b00648617eb5d63d2c7a6587b7cafc/jmespath-1.0.1-py3-none-any.whl", hash = "sha256:02e2e4cc71b5bcab88332eebf907519190dd9e6e82107fa7f83b1003a6252980", size = 20256, upload-time = "2022-06-17T18:00:10.251Z" }, +] + +[[package]] +name = "psutil" +version = "7.0.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/2a/80/336820c1ad9286a4ded7e845b2eccfcb27851ab8ac6abece774a6ff4d3de/psutil-7.0.0.tar.gz", hash = "sha256:7be9c3eba38beccb6495ea33afd982a44074b78f28c434a1f51cc07fd315c456", size = 497003, upload-time = "2025-02-13T21:54:07.946Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ed/e6/2d26234410f8b8abdbf891c9da62bee396583f713fb9f3325a4760875d22/psutil-7.0.0-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:101d71dc322e3cffd7cea0650b09b3d08b8e7c4109dd6809fe452dfd00e58b25", size = 238051, upload-time = "2025-02-13T21:54:12.36Z" }, + { url = "https://files.pythonhosted.org/packages/04/8b/30f930733afe425e3cbfc0e1468a30a18942350c1a8816acfade80c005c4/psutil-7.0.0-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:39db632f6bb862eeccf56660871433e111b6ea58f2caea825571951d4b6aa3da", size = 239535, upload-time = "2025-02-13T21:54:16.07Z" }, + { url = "https://files.pythonhosted.org/packages/2a/ed/d362e84620dd22876b55389248e522338ed1bf134a5edd3b8231d7207f6d/psutil-7.0.0-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1fcee592b4c6f146991ca55919ea3d1f8926497a713ed7faaf8225e174581e91", size = 275004, upload-time = "2025-02-13T21:54:18.662Z" }, + { url = "https://files.pythonhosted.org/packages/bf/b9/b0eb3f3cbcb734d930fdf839431606844a825b23eaf9a6ab371edac8162c/psutil-7.0.0-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4b1388a4f6875d7e2aff5c4ca1cc16c545ed41dd8bb596cefea80111db353a34", size = 277986, upload-time = "2025-02-13T21:54:21.811Z" }, + { url = "https://files.pythonhosted.org/packages/eb/a2/709e0fe2f093556c17fbafda93ac032257242cabcc7ff3369e2cb76a97aa/psutil-7.0.0-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a5f098451abc2828f7dc6b58d44b532b22f2088f4999a937557b603ce72b1993", size = 279544, upload-time = "2025-02-13T21:54:24.68Z" }, + { url = "https://files.pythonhosted.org/packages/50/e6/eecf58810b9d12e6427369784efe814a1eec0f492084ce8eb8f4d89d6d61/psutil-7.0.0-cp37-abi3-win32.whl", hash = "sha256:ba3fcef7523064a6c9da440fc4d6bd07da93ac726b5733c29027d7dc95b39d99", size = 241053, upload-time = "2025-02-13T21:54:34.31Z" }, + { url = "https://files.pythonhosted.org/packages/50/1b/6921afe68c74868b4c9fa424dad3be35b095e16687989ebbb50ce4fceb7c/psutil-7.0.0-cp37-abi3-win_amd64.whl", hash = "sha256:4cf3d4eb1aa9b348dec30105c55cd9b7d4629285735a102beb4441e38db90553", size = 244885, upload-time = "2025-02-13T21:54:37.486Z" }, +]