From e778e7a42f606c97229a447e945405a9f46a5d35 Mon Sep 17 00:00:00 2001 From: Clemens Schwaighofer Date: Tue, 1 Jul 2025 15:05:32 +0900 Subject: [PATCH] CoreLibs for Python this is an intial test install. The folder and file names will differ and some things will move around. 1.0.0 will have the first usable setup --- .gitignore | 5 + ReadMe.md | 19 ++ pyproject.toml | 48 +++++ src/CoreLibs/__init__.py | 0 src/CoreLibs/csv/__init__.py | 0 src/CoreLibs/csv/csv_writer.py | 91 +++++++++ src/CoreLibs/debug/__init__.py | 0 src/CoreLibs/debug/profiling.py | 126 ++++++++++++ src/CoreLibs/debug/timer.py | 114 +++++++++++ src/CoreLibs/debug/writeline.py | 75 +++++++ src/CoreLibs/fileHandling/__init__.py | 0 src/CoreLibs/fileHandling/file_crc.py | 44 ++++ src/CoreLibs/fileHandling/file_handling.py | 46 +++++ src/CoreLibs/json/__init__.py | 0 src/CoreLibs/json/jmespath_helper.py | 35 ++++ src/CoreLibs/json/json_helper.py | 31 +++ src/CoreLibs/listDictHandling/__init__.py | 0 src/CoreLibs/listDictHandling/data_search.py | 128 ++++++++++++ src/CoreLibs/listDictHandling/dump_data.py | 21 ++ .../listDictHandling/fingerprinting.py | 37 ++++ src/CoreLibs/listDictHandling/manage_dict.py | 59 ++++++ src/CoreLibs/logging/__init__.py | 0 src/CoreLibs/logging/error_handling.py | 89 ++++++++ src/CoreLibs/logging/log.py | 120 +++++++++++ src/CoreLibs/requests/__init__.py | 0 src/CoreLibs/requests/caller.py | 190 ++++++++++++++++++ src/CoreLibs/stringHandling/__init__.py | 0 src/CoreLibs/stringHandling/string_helpers.py | 34 ++++ uv.lock | 42 ++++ 29 files changed, 1354 insertions(+) create mode 100644 .gitignore create mode 100644 ReadMe.md create mode 100644 pyproject.toml create mode 100644 src/CoreLibs/__init__.py create mode 100644 src/CoreLibs/csv/__init__.py create mode 100644 src/CoreLibs/csv/csv_writer.py create mode 100644 src/CoreLibs/debug/__init__.py create mode 100644 src/CoreLibs/debug/profiling.py create mode 100644 src/CoreLibs/debug/timer.py create mode 100644 src/CoreLibs/debug/writeline.py create mode 100644 src/CoreLibs/fileHandling/__init__.py create mode 100644 src/CoreLibs/fileHandling/file_crc.py create mode 100644 src/CoreLibs/fileHandling/file_handling.py create mode 100644 src/CoreLibs/json/__init__.py create mode 100644 src/CoreLibs/json/jmespath_helper.py create mode 100644 src/CoreLibs/json/json_helper.py create mode 100644 src/CoreLibs/listDictHandling/__init__.py create mode 100644 src/CoreLibs/listDictHandling/data_search.py create mode 100644 src/CoreLibs/listDictHandling/dump_data.py create mode 100644 src/CoreLibs/listDictHandling/fingerprinting.py create mode 100644 src/CoreLibs/listDictHandling/manage_dict.py create mode 100644 src/CoreLibs/logging/__init__.py create mode 100644 src/CoreLibs/logging/error_handling.py create mode 100644 src/CoreLibs/logging/log.py create mode 100644 src/CoreLibs/requests/__init__.py create mode 100644 src/CoreLibs/requests/caller.py create mode 100644 src/CoreLibs/stringHandling/__init__.py create mode 100644 src/CoreLibs/stringHandling/string_helpers.py create mode 100644 uv.lock diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ad202a0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.venv/ +**/__pycache__/* +**/*.egg-info +.mypy_cache/ +**/.env diff --git a/ReadMe.md b/ReadMe.md new file mode 100644 index 0000000..be75417 --- /dev/null +++ b/ReadMe.md @@ -0,0 +1,19 @@ +# CoreLibs for Python + +This is a pip package that can be installed into any project and covers the following pars + +- loggingn update with exception logs +- requests wrapper for easier auth pass on access +- dict fingerprinting +- jmespath search +- dump outputs for data +- progress printing + +## Python venv setup + +In the folder where the script will be located + +```sh +uv venv --python 3.13 +``` + diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..9b83a34 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,48 @@ +# MARK: Project info +[project] +name = "corelibs-python" +version = "0.1.0" +description = "Collection of utils for Python scripts" +readme = "ReadMe.md" +requires-python = ">=3.13" +dependencies = [ + "jmespath>=1.0.1", + "psutil>=7.0.0", +] +# set this to disable publish to pypi (pip) +# classifiers = ["Private :: Do Not Upload"] + +# MARK: build target +[[tool.uv.index]] +name = "egra-gitea" +url = "https://git.egplusww.jp/org/PyPI/dashboard" +publish-url = "https://git.egplusww.jp/api/packages/PyPI/pypi" +explicit = true + +# MARK: build system +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +# MARK: Python tools +[tool.pyright] +typeCheckingMode = "strict" +reportMissingImports = "information" +reportMissingTypeStubs = "information" +reportUnknownMemberType = "information" +[tool.ruff] +line-length = 120 +[tool.black] +# set 10 short for better formatting +line-length = 110 +# to avoid " ... " " ... " string sets +# experimental-string-processing = true +preview = true +enable-unstable-feature = ["string_processing"] +[tool.pylint.format] +max-line-length = 120 +[tool.pylint.miscellaneous] +notes = ["FIXME", "TODO"] +notes-rgx = '(FIXME|TODO)(\((TTD-|#)\[0-9]+\))' +[tool.flake8] +max-line-length = 120 diff --git a/src/CoreLibs/__init__.py b/src/CoreLibs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/CoreLibs/csv/__init__.py b/src/CoreLibs/csv/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/CoreLibs/csv/csv_writer.py b/src/CoreLibs/csv/csv_writer.py new file mode 100644 index 0000000..ff4a529 --- /dev/null +++ b/src/CoreLibs/csv/csv_writer.py @@ -0,0 +1,91 @@ +""" +Write to CSV file +- each class set is one file write with one header set +""" + +from typing import Any +from pathlib import Path +from collections import Counter +import csv + + +class CsvWriter: + """ + write to a CSV file + """ + + def __init__( + self, + path: Path, + file_name: str, + header: dict[str, str], + header_order: list[str] | None = None + ): + self.path = path + self.file_name = file_name + # Key: index for write for the line dict, Values: header entries + self.header = header + self.csv_file_writer = self.__open_csv(header_order) + + def __open_csv(self, header_order: list[str] | None) -> 'csv.DictWriter[str] | None': + """ + open csv file for writing, write headers + + Note that if there is no header_order set we use the order in header dictionary + + Arguments: + line {list[str] | None} -- optional dedicated header order + + Returns: + csv.DictWriter[str] | None: _description_ + """ + # if header order is set, make sure all header value fields exist + header_values = self.header.values() + if header_order is not None: + if Counter(header_values) != Counter(header_order): + print( + "header order does not match header values: " + f"{', '.join(header_values)} != {', '.join(header_order)}" + ) + return None + header_values = header_order + # no duplicates + if len(header_values) != len(set(header_values)): + print(f"Header must have unique values only: {', '.join(header_values)}") + return None + try: + fp = open( + self.path.joinpath(self.file_name), + "w", encoding="utf-8" + ) + csv_file_writer = csv.DictWriter( + fp, + fieldnames=header_values, + delimiter=",", + quotechar='"', + quoting=csv.QUOTE_MINIMAL, + ) + csv_file_writer.writeheader() + return csv_file_writer + except OSError as err: + print("OS error:", err) + return None + + def write_csv(self, line: dict[str, str]) -> bool: + """ + write member csv line + + Arguments: + line {dict[str, str]} -- _description_ + + Returns: + bool -- _description_ + """ + if self.csv_file_writer is None: + return False + csv_row: dict[str, Any] = {} + # only write entries that are in the header list + for key, value in self.header.items(): + csv_row[value] = line[key] + self.csv_file_writer.writerow(csv_row) + return True diff --git a/src/CoreLibs/debug/__init__.py b/src/CoreLibs/debug/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/CoreLibs/debug/profiling.py b/src/CoreLibs/debug/profiling.py new file mode 100644 index 0000000..75f80da --- /dev/null +++ b/src/CoreLibs/debug/profiling.py @@ -0,0 +1,126 @@ +""" +Profile memory usage in Python +""" + +# https://docs.python.org/3/library/tracemalloc.html + +import os +import time +import tracemalloc +import linecache +from typing import Tuple +from tracemalloc import Snapshot +import psutil + + +def display_top(snapshot: Snapshot, key_type: str = 'lineno', limit: int = 10) -> str: + """ + Print tracmalloc stats + https://docs.python.org/3/library/tracemalloc.html#pretty-top + + Args: + snapshot (Snapshot): _description_ + key_type (str, optional): _description_. Defaults to 'lineno'. + limit (int, optional): _description_. Defaults to 10. + """ + snapshot = snapshot.filter_traces(( + tracemalloc.Filter(False, ""), + tracemalloc.Filter(False, ""), + )) + top_stats = snapshot.statistics(key_type) + + profiler_msg = f"Top {limit} lines" + for index, stat in enumerate(top_stats[:limit], 1): + frame = stat.traceback[0] + # replace "/path/to/module/file.py" with "module/file.py" + filename = os.sep.join(frame.filename.split(os.sep)[-2:]) + profiler_msg += f"#{index}: {filename}:{frame.lineno}: {(stat.size / 1024):.1f} KiB" + line = linecache.getline(frame.filename, frame.lineno).strip() + if line: + profiler_msg += f" {line}" + + other = top_stats[limit:] + if other: + size = sum(stat.size for stat in other) + profiler_msg += f"{len(other)} other: {(size / 1024):.1f} KiB" + total = sum(stat.size for stat in top_stats) + profiler_msg += f"Total allocated size: {(total / 1024):.1f} KiB" + return profiler_msg + + +class Profiling: + """ + Profile memory usage and elapsed time for some block + Based on: https://stackoverflow.com/a/53301648 + """ + + def __init__(self): + # profiling id + self.__ident: str = '' + # memory + self.__rss_before: int = 0 + self.__vms_before: int = 0 + # self.shared_before: int = 0 + self.__rss_used: int = 0 + self.__vms_used: int = 0 + # self.shared_used: int = 0 + # time + self.__call_start: float = 0 + self.__elapsed = 0 + + def __get_process_memory(self) -> Tuple[int, int]: + process = psutil.Process(os.getpid()) + mi = process.memory_info() + # macos does not have mi.shared + return mi.rss, mi.vms + + def __elapsed_since(self) -> str: + elapsed = time.time() - self.__call_start + if elapsed < 1: + return str(round(elapsed * 1000, 2)) + "ms" + if elapsed < 60: + return str(round(elapsed, 2)) + "s" + if elapsed < 3600: + return str(round(elapsed / 60, 2)) + "min" + return str(round(elapsed / 3600, 2)) + "hrs" + + def __format_bytes(self, bytes_data: int) -> str: + if abs(bytes_data) < 1000: + return str(bytes_data) + "B" + if abs(bytes_data) < 1e6: + return str(round(bytes_data / 1e3, 2)) + "kB" + if abs(bytes_data) < 1e9: + return str(round(bytes_data / 1e6, 2)) + "MB" + return str(round(bytes_data / 1e9, 2)) + "GB" + + def start_profiling(self, ident: str) -> None: + """ + start the profiling + """ + self.__ident = ident + self.__rss_before, self.__vms_before = self.__get_process_memory() + self.__call_start = time.time() + + def end_profiling(self) -> None: + """ + end the profiling + """ + if self.__rss_before == 0 and self.__vms_before == 0: + print("start_profile() was not called, output will be negative") + self.__elapsed = self.__elapsed_since() + __rss_after, __vms_after = self.__get_process_memory() + self.__rss_used = __rss_after - self.__rss_before + self.__vms_used = __vms_after - self.__vms_before + + def print_profiling(self) -> str: + """ + print the profiling time + """ + return ( + f"Profiling: {self.__ident:>20} " + f"RSS: {self.__format_bytes(self.__rss_used):>8} | " + f"VMS: {self.__format_bytes(self.__vms_used):>8} | " + f"time: {self.__elapsed:>8}" + ) + +# __END__ diff --git a/src/CoreLibs/debug/timer.py b/src/CoreLibs/debug/timer.py new file mode 100644 index 0000000..a2c5d26 --- /dev/null +++ b/src/CoreLibs/debug/timer.py @@ -0,0 +1,114 @@ +""" +a interval time class + +Returns: + Timer: class timer for basic time run calculations +""" + +from datetime import datetime, timedelta + + +class Timer: + """ + get difference between start and end date/time + """ + + def __init__(self): + """ + init new start time and set end time to None + """ + self._overall_start_time = datetime.now() + self._overall_end_time = None + self._overall_run_time = None + self._start_time = datetime.now() + self._end_time = None + self._run_time = None + + # MARK: overall run time + def overall_run_time(self) -> timedelta: + """ + overall run time difference from class launch to call of this function + + Returns: + timedelta: _description_ + """ + self._overall_end_time = datetime.now() + self._overall_run_time = self._overall_end_time - self._overall_start_time + return self._overall_run_time + + def get_overall_start_time(self) -> datetime: + """ + get set start time + + Returns: + datetime: _description_ + """ + return self._overall_start_time + + def get_overall_end_time(self) -> datetime | None: + """ + get set end time or None for not set + + Returns: + datetime|None: _description_ + """ + return self._overall_end_time + + def get_overall_run_time(self) -> timedelta | None: + """ + get run time or None if run time was not called + + Returns: + datetime|None: _description_ + """ + return self._overall_run_time + + # MARK: set run time + def run_time(self) -> timedelta: + """ + difference between start time and current time + + Returns: + datetime: _description_ + """ + self._end_time = datetime.now() + self._run_time = self._end_time - self._start_time + return self._run_time + + def reset_run_time(self): + """ + reset start/end and run tine + """ + self._start_time = datetime.now() + self._end_time = None + self._run_time = None + + def get_start_time(self) -> datetime: + """ + get set start time + + Returns: + datetime: _description_ + """ + return self._start_time + + def get_end_time(self) -> datetime | None: + """ + get set end time or None for not set + + Returns: + datetime|None: _description_ + """ + return self._end_time + + def get_run_time(self) -> timedelta | None: + """ + get run time or None if run time was not called + + Returns: + datetime|None: _description_ + """ + return self._run_time + + +# __END__ diff --git a/src/CoreLibs/debug/writeline.py b/src/CoreLibs/debug/writeline.py new file mode 100644 index 0000000..cb2c6e2 --- /dev/null +++ b/src/CoreLibs/debug/writeline.py @@ -0,0 +1,75 @@ +""" +Various small helpers for data writing +""" + +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from io import TextIOWrapper + + +def write_l(line: str, fpl: 'TextIOWrapper | None' = None, print_line: bool = False): + """ + Write a line to screen and to output file + + Args: + line (String): Line to write + fpl (Resource): file handler resource, if none write only to console + """ + if print_line is True: + print(line) + if fpl is not None: + fpl.write(line + "\n") + + +# progress printers + +def pr_header(tag: str, marker_string: str = '#', width: int = 35): + """_summary_ + + Args: + tag (str): _description_ + """ + print(f" {marker_string} {tag:^{width}} {marker_string}") + + +def pr_title(tag: str, prefix_string: str = '|', space_filler: str = '.', width: int = 35): + """_summary_ + + Args: + tag (str): _description_ + prefix_string (str, optional): _description_. Defaults to '|'. + """ + print(f" {prefix_string} {tag:{space_filler}<{width}}:", flush=True) + + +def pr_open(tag: str, prefix_string: str = '|', space_filler: str = '.', width: int = 35): + """ + writen progress open line with tag + + Args: + tag (str): _description_ + prefix_string (str): prefix string. Default: '|' + """ + print(f" {prefix_string} {tag:{space_filler}<{width}} [", end="", flush=True) + + +def pr_close(tag: str = ''): + """ + write the close tag with new line + + Args: + tag (str, optional): _description_. Defaults to ''. + """ + print(f"{tag}]", flush=True) + + +def pr_act(act: str = "."): + """ + write progress character + + Args: + act (str, optional): _description_. Defaults to ".". + """ + print(f"{act}", end="", flush=True) + +# __EMD__ diff --git a/src/CoreLibs/fileHandling/__init__.py b/src/CoreLibs/fileHandling/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/CoreLibs/fileHandling/file_crc.py b/src/CoreLibs/fileHandling/file_crc.py new file mode 100644 index 0000000..3bc2fa6 --- /dev/null +++ b/src/CoreLibs/fileHandling/file_crc.py @@ -0,0 +1,44 @@ +""" +crc handlers for file CRC +""" + +import zlib +from pathlib import Path + + +def file_crc(file_path: Path) -> str: + """ + With for loop and buffer, create file crc32 + + Args: + file_path (str | Path): _description_ + + Returns: + str: file crc32 + """ + crc = 0 + with open(file_path, 'rb', 65536) as ins: + for _ in range(int((file_path.stat().st_size / 65536)) + 1): + crc = zlib.crc32(ins.read(65536), crc) + return f"{crc & 0xFFFFFFFF:08X}" + + +def file_name_crc(file_path: Path, add_parent_folder: bool = False) -> str: + """ + either returns file name only from path + eg: /foo/bar/baz/file.csv will be file.csv + or + return the first parent path from path + file + eg: /foo/bar/baz/file.csv will be baz/file.csv + + Args: + file_path (Path): _description_ + add_parent_folder (bool, optional): _description_. Defaults to False. + + Returns: + str: file name as string + """ + if add_parent_folder: + return str(Path(file_path.parent.name).joinpath(file_path.name)) + else: + return file_path.name diff --git a/src/CoreLibs/fileHandling/file_handling.py b/src/CoreLibs/fileHandling/file_handling.py new file mode 100644 index 0000000..0672cfe --- /dev/null +++ b/src/CoreLibs/fileHandling/file_handling.py @@ -0,0 +1,46 @@ +""" +File handling utilities +""" + +import os +import shutil +from pathlib import Path + + +def remove_all_in_directory(directory: Path, ignore_files: list[str] | None = None, verbose: bool = False) -> bool: + """ + remove all files and folders in a directory + can exclude files or folders + + Args: + directory (Path): _description_ + ignore_files (list[str], optional): _description_. Defaults to None. + + Returns: + bool: _description_ + """ + if not directory.is_dir(): + return False + if ignore_files is None: + ignore_files = [] + if verbose: + print(f"Remove old files in: {directory.name} [", end="", flush=True) + # remove all files and folders in given directory by recursive globbing + for file in directory.rglob("*"): + # skip if in ignore files + if file.name in ignore_files: + continue + # remove one file, or a whole directory + if file.is_file(): + os.remove(file) + if verbose: + print(".", end="", flush=True) + elif file.is_dir(): + shutil.rmtree(file) + if verbose: + print("/", end="", flush=True) + if verbose: + print("]", flush=True) + return True + +# __END__ diff --git a/src/CoreLibs/json/__init__.py b/src/CoreLibs/json/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/CoreLibs/json/jmespath_helper.py b/src/CoreLibs/json/jmespath_helper.py new file mode 100644 index 0000000..2bb9e47 --- /dev/null +++ b/src/CoreLibs/json/jmespath_helper.py @@ -0,0 +1,35 @@ +""" +helper functions for jmespath interfaces +""" + +from typing import Any +import jmespath +import jmespath.exceptions + + +def jmespath_search(search_data: dict[Any, Any] | list[Any], search_params: str) -> Any: + """ + jmespath search wrapper + + Args: + search_data (dict | list): _description_ + search_params (str): _description_ + + Raises: + ValueError: jmespath.exceptions.LexerError + ValueError: jmespath.exceptions.ParseError + + Returns: + Any: dict/list/etc, None if nothing found + """ + try: + search_result = jmespath.search(search_params, search_data) + except jmespath.exceptions.LexerError as excp: + raise ValueError(f"Compile failed: {search_params}: {excp}") from excp + except jmespath.exceptions.ParseError as excp: + raise ValueError(f"Parse failed: {search_params}: {excp}") from excp + except TypeError as excp: + raise ValueError(f"Type error for search_params: {excp}") from excp + return search_result + +# __END__ diff --git a/src/CoreLibs/json/json_helper.py b/src/CoreLibs/json/json_helper.py new file mode 100644 index 0000000..4db2ff0 --- /dev/null +++ b/src/CoreLibs/json/json_helper.py @@ -0,0 +1,31 @@ +""" +json encoder for datetime +""" + +from typing import Any +from json import JSONEncoder +from datetime import datetime, date + + +# subclass JSONEncoder +class DateTimeEncoder(JSONEncoder): + """ + Override the default method + cls=DateTimeEncoder + """ + def default(self, o: Any) -> str | None: + if isinstance(o, (date, datetime)): + return o.isoformat() + return None + + +def default(obj: Any) -> str | None: + """ + default override + default=default + """ + if isinstance(obj, (date, datetime)): + return obj.isoformat() + return None + +# __END__ diff --git a/src/CoreLibs/listDictHandling/__init__.py b/src/CoreLibs/listDictHandling/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/CoreLibs/listDictHandling/data_search.py b/src/CoreLibs/listDictHandling/data_search.py new file mode 100644 index 0000000..d3b96b7 --- /dev/null +++ b/src/CoreLibs/listDictHandling/data_search.py @@ -0,0 +1,128 @@ +""" +wrapper around search path +""" + +from typing import Any + + +def array_search( + search_params: list[dict[str, str | bool | list[str | None]]], + data: list[dict[str, Any]], + return_index: bool = False +) -> list[dict[str, Any]]: + """ + search in an array of dicts with an array of Key/Value set + all Key/Value sets must match + Value set can be list for OR match + option: case_senstive: default True + + Args: + search_params (list): List of search params in "Key"/"Value" lists with options + data (list): data to search in, must be a list + return_index (bool): return index of list [default False] + + Raises: + ValueError: if search params is not a list + KeyError: if Key or Value are missing in search params + KeyError: More than one Key with the same name set + + Returns: + list: list of found elements, or if return index + list of dics with "index" and "data", where "data" holds the result list + """ + if not isinstance(search_params, list): # type: ignore + raise ValueError("search_params must be a list") + keys = [] + for search in search_params: + if not search.get('Key') or not search.get('Value'): + raise KeyError( + f"Either Key '{search.get('Key', '')}' or " + f"Value '{search.get('Value', '')}' is missing or empty" + ) + # if double key -> abort + if search.get("Key") in keys: + raise KeyError( + f"Key {search.get('Key', '')} already exists in search_params" + ) + + return_items: list[dict[str, Any]] = [] + for si_idx, search_item in enumerate(data): + # for each search entry, all must match + matching = 0 + for search in search_params: + # either Value direct or if Value is list then any of those items can match + # values are compared in lower case if case senstive is off + # lower case left side + # TODO: allow nested Keys. eg "Key: ["Key a", "key b"]" to be ["Key a"]["key b"] + if search.get("case_sensitive", True) is False: + search_value = search_item.get(str(search['Key']), "").lower() + else: + search_value = search_item.get(str(search['Key']), "") + # lower case right side + if isinstance(search['Value'], list): + search_in = [ + str(k).lower() + if search.get("case_sensitive", True) is False else k + for k in search['Value'] + ] + elif search.get("case_sensitive", True) is False: + search_in = str(search['Value']).lower() + else: + search_in = search['Value'] + # compare check + if ( + ( + isinstance(search_in, list) and + search_value in search_in + ) or + search_value == search_in + ): + matching += 1 + if len(search_params) == matching: + if return_index is True: + # the data is now in "data sub set" + return_items.append({ + "index": si_idx, + "data": search_item + }) + else: + return_items.append(search_item) + # return all found or empty list + return return_items + + +def key_lookup(haystack: dict[str, str], key: str) -> str: + """ + simple key lookup in haystack, erturns empty string if not found + + Args: + haystack (dict[str, str]): _description_ + key (str): _description_ + + Returns: + str: _description_ + """ + return haystack.get(key, "") + + +def value_lookup(haystack: dict[str, str], value: str, raise_on_many: bool = False) -> str: + """ + find by value, if not found returns empty, if not raise on many returns the first one + + Args: + haystack (dict[str, str]): _description_ + value (str): _description_ + raise_on_many (bool, optional): _description_. Defaults to False. + + Raises: + ValueError: _description_ + + Returns: + str: _description_ + """ + keys = [__key for __key, __value in haystack.items() if __value == value] + if not keys: + return "" + if raise_on_many is True and len(keys) > 1: + raise ValueError("More than one element found with the same name") + return keys[0] diff --git a/src/CoreLibs/listDictHandling/dump_data.py b/src/CoreLibs/listDictHandling/dump_data.py new file mode 100644 index 0000000..95048b2 --- /dev/null +++ b/src/CoreLibs/listDictHandling/dump_data.py @@ -0,0 +1,21 @@ +""" +dict dump as JSON formatted +""" + +import json +from typing import Any + + +def dump_data(data: dict[Any, Any] | list[Any] | str | None) -> str: + """ + dump formated output from dict/list + + Args: + data (dict | list | str): _description_ + + Returns: + str: _description_ + """ + return json.dumps(data, indent=4, ensure_ascii=False, default=str) + +# __END__ diff --git a/src/CoreLibs/listDictHandling/fingerprinting.py b/src/CoreLibs/listDictHandling/fingerprinting.py new file mode 100644 index 0000000..3bcb092 --- /dev/null +++ b/src/CoreLibs/listDictHandling/fingerprinting.py @@ -0,0 +1,37 @@ +""" +Various dictionary, object and list hashers +""" + +import json +import hashlib +from typing import Any + + +def dict_hash_frozen(data: dict[Any, Any]) -> int: + """ + hash a dict via freeze + + Args: + data (dict): _description_ + + Returns: + str: _description_ + """ + return hash(frozenset(data.items())) + + +def dict_hash_crc(data: dict[Any, Any] | list[Any]) -> str: + """ + Create a sha256 hash over dict + alternative for + dict_hash_frozen + + Args: + data (dict | list): _description_ + + Returns: + str: _description_ + """ + return hashlib.sha256( + json.dumps(data, sort_keys=True, ensure_ascii=True).encode('utf-8') + ).hexdigest() diff --git a/src/CoreLibs/listDictHandling/manage_dict.py b/src/CoreLibs/listDictHandling/manage_dict.py new file mode 100644 index 0000000..446eb9b --- /dev/null +++ b/src/CoreLibs/listDictHandling/manage_dict.py @@ -0,0 +1,59 @@ +""" +Various helper functions for type data clean up +""" + +from typing import Any, cast + + +def delete_keys_from_set( + set_data: dict[str, Any] | list[Any] | Any, keys: list[str] +) -> dict[str, Any] | list[Any] | Any: + """ + remove all keys from set_data + + Args: + set_data (dict[str, Any] | list[Any] | None): _description_ + keys (list[str]): _description_ + + Returns: + dict[str, Any] | list[Any] | None: _description_ + """ + # skip everything if there is no keys list + if not keys: + return set_data + if isinstance(set_data, dict): + for key, value in set_data.copy().items(): + if key in keys: + del set_data[key] + if isinstance(value, (dict, list)): + delete_keys_from_set(value, keys) + elif isinstance(set_data, list): + for value in set_data: + if isinstance(value, (dict, list)): + delete_keys_from_set(value, keys) + + return set_data + + +def build_dict( + any_dict: Any, ignore_entries: list[str] | None = None +) -> dict[str, Any | list[Any] | dict[Any, Any]]: + """ + rewrite any AWS *TypeDef to new dict so we can add/change entrys + + Args: + any_dict (Any): _description_ + + Returns: + dict[str, Any | list[Any]]: _description_ + """ + if ignore_entries is None: + return cast(dict[str, Any | list[Any] | dict[Any, Any]], any_dict) + # ignore entries can be one key or key nested + # return { + # key: value for key, value in any_dict.items() if key not in ignore_entries + # } + return cast( + dict[str, Any | list[Any] | dict[Any, Any]], + delete_keys_from_set(any_dict, ignore_entries) + ) diff --git a/src/CoreLibs/logging/__init__.py b/src/CoreLibs/logging/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/CoreLibs/logging/error_handling.py b/src/CoreLibs/logging/error_handling.py new file mode 100644 index 0000000..db29fea --- /dev/null +++ b/src/CoreLibs/logging/error_handling.py @@ -0,0 +1,89 @@ +""" +Collect error and warning messages as JSON blocks into an array +""" + +from typing import Any + + +class ErrorMessage: + """ + Error and Warning collector + """ + + # errors and warning + __error_list: list[dict[str, Any]] = [] + __warning_list: list[dict[str, Any]] = [] + + def reset_warnings(self): + """ + reset warnings + """ + ErrorMessage.__warning_list = [] + + def add_warning(self, message: dict[str, Any], base_message: dict[str, Any] | None = None): + """ + add one warning + + Args: + message (dict): _description_ + base_message (dict, optional): _description_. Defaults to {}. + """ + if base_message is None or not isinstance(base_message, dict): # type: ignore + base_message = {} + base_message['level'] = "Warning" + ErrorMessage.__warning_list.append(base_message | message) + + def get_warnings(self) -> list[dict[str, Any]]: + """_summary_ + + Returns: + list: _description_ + """ + return ErrorMessage.__warning_list + + def has_warnings(self) -> bool: + """ + check if there ware warnings + + Returns: + bool: _description_ + """ + return bool(ErrorMessage.__warning_list) + + def reset_errors(self): + """ + reset the error list + """ + ErrorMessage.__error_list = [] + + def add_error(self, message: dict[str, Any], base_messasge: dict[str, Any] | None = None): + """ + add one error + + Args: + error (dict): _description_ + base_error (dict, optional): _description_. Defaults to {}. + """ + if base_messasge is None or not isinstance(base_messasge, dict): # type: ignore + base_messasge = {} + base_messasge['level'] = "Error" + ErrorMessage.__error_list.append(base_messasge | message) + + def get_errors(self) -> list[dict[str, Any]]: + """_summary_ + + Returns: + list: _description_ + """ + return ErrorMessage.__error_list + + def has_errors(self) -> bool: + """ + check if there ware warnings + + Returns: + bool: _description_ + """ + return bool(ErrorMessage.__error_list) + +# __END__ diff --git a/src/CoreLibs/logging/log.py b/src/CoreLibs/logging/log.py new file mode 100644 index 0000000..1c9d642 --- /dev/null +++ b/src/CoreLibs/logging/log.py @@ -0,0 +1,120 @@ +""" +A log handler wrapper +""" + +import logging.handlers +import logging +from pathlib import Path +from typing import Mapping + + +class Log: + """ + logger setup + """ + + EXCEPTION: int = 60 + + def __init__( + self, + log_path: Path, + log_name: str, + log_level_console: str = 'WARNING', + log_level_file: str = 'DEBUG', + add_start_info: bool = True + ): + logging.addLevelName(Log.EXCEPTION, 'EXCEPTION') + if not log_name.endswith('.log'): + log_path = log_path.with_suffix('.log') + # overall logger settings + self.logger = logging.getLogger(log_name) + # set maximum logging level for all logging output + self.logger.setLevel(logging.DEBUG) + # console logger + self.__console_handler(log_level_console) + # file logger + self.__file_handler(log_level_file, log_path) + # if requests set a start log + if add_start_info is True: + self.break_line('START') + + def __filter_exceptions(self, record: logging.LogRecord) -> bool: + return record.levelname != "EXCEPTION" + + def __console_handler(self, log_level_console: str = 'WARNING'): + # console logger + if not isinstance(getattr(logging, log_level_console.upper(), None), int): + log_level_console = 'WARNING' + console_handler = logging.StreamHandler() + formatter_console = logging.Formatter( + ( + '[%(asctime)s.%(msecs)03d] ' + '[%(filename)s:%(funcName)s:%(lineno)d] ' + '<%(levelname)s> ' + '%(message)s' + ), + datefmt="%Y-%m-%d %H:%M:%S", + ) + console_handler.setLevel(log_level_console) + # do not show exceptions logs on console + console_handler.addFilter(self.__filter_exceptions) + console_handler.setFormatter(formatter_console) + self.logger.addHandler(console_handler) + + def __file_handler(self, log_level_file: str, log_path: Path) -> None: + # file logger + if not isinstance(getattr(logging, log_level_file.upper(), None), int): + log_level_file = 'DEBUG' + file_handler = logging.handlers.TimedRotatingFileHandler( + filename=log_path, + encoding="utf-8", + when="D", + interval=1 + ) + formatter_file_handler = logging.Formatter( + ( + '[%(asctime)s.%(msecs)03d] ' + '[%(pathname)s:%(funcName)s:%(lineno)d] ' + '[%(name)s:%(process)d] ' + '<%(levelname)s> ' + '%(message)s' + ), + datefmt="%Y-%m-%dT%H:%M:%S", + ) + file_handler.setLevel(log_level_file) + file_handler.setFormatter(formatter_file_handler) + self.logger.addHandler(file_handler) + + def break_line(self, info: str = "BREAK"): + """ + add a break line as info level + + Keyword Arguments: + info {str} -- _description_ (default: {"BREAK"}) + """ + self.logger.info("[%s] ================================>", info) + + def exception(self, msg: object, *args: object, extra: Mapping[str, object] | None = None) -> None: + """ + log on exceotion level + + Args: + msg (object): _description_ + *args (object): arguments for msg + extra: Mapping[str, object] | None: extra arguments for the formatting if needed + """ + self.logger.log(Log.EXCEPTION, msg, *args, exc_info=True, extra=extra) + + def validate_log_level(self, log_level: str) -> bool: + """ + if the log level is invalid, will erturn false + + Args: + log_level (str): _description_ + + Returns: + bool: _description_ + """ + return isinstance(getattr(logging, log_level.upper(), None), int) + +# __END__ diff --git a/src/CoreLibs/requests/__init__.py b/src/CoreLibs/requests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/CoreLibs/requests/caller.py b/src/CoreLibs/requests/caller.py new file mode 100644 index 0000000..e2261b4 --- /dev/null +++ b/src/CoreLibs/requests/caller.py @@ -0,0 +1,190 @@ +""" +requests lib interface +V2 call type +""" + +from typing import Any +import warnings +import requests +# to hide the verfiy warnings because of the bad SSL settings from Netskope, Akamai, etc +warnings.filterwarnings('ignore', message='Unverified HTTPS request') + + +class Caller: + """_summary_""" + + def __init__( + self, + header: dict[str, str], + verify: bool = True, + timeout: int = 20, + proxy: dict[str, str] | None = None + ): + self.headers = header + self.timeout: int = timeout + self.cafile = "/Library/Application Support/Netskope/STAgent/data/nscacert.pem" + self.verify = verify + self.proxy = proxy + + def __timeout(self, timeout: int | None) -> int: + if timeout is not None: + return timeout + return self.timeout + + def __call( + self, + action: str, + url: str, + data: dict[str, Any] | None = None, + params: dict[str, Any] | None = None, + timeout: int | None = None + ) -> requests.Response | None: + """ + call wrapper, on error returns None + + Args: + action (str): _description_ + url (str): _description_ + data (dict | None): _description_. Defaults to None. + params (dict | None): _description_. Defaults to None. + + Returns: + requests.Response | None: _description_ + """ + + if data is None: + data = {} + try: + response = None + if action == "get": + response = requests.get( + url, + params=params, + headers=self.headers, + timeout=self.__timeout(timeout), + verify=self.verify, + proxies=self.proxy + ) + elif action == "post": + response = requests.post( + url, + params=params, + json=data, + headers=self.headers, + timeout=self.__timeout(timeout), + verify=self.verify, + proxies=self.proxy + ) + elif action == "put": + response = requests.put( + url, + params=params, + json=data, + headers=self.headers, + timeout=self.__timeout(timeout), + verify=self.verify, + proxies=self.proxy + ) + elif action == "patch": + response = requests.patch( + url, + params=params, + json=data, + headers=self.headers, + timeout=self.__timeout(timeout), + verify=self.verify, + proxies=self.proxy + ) + elif action == "delete": + response = requests.delete( + url, + params=params, + headers=self.headers, + timeout=self.__timeout(timeout), + verify=self.verify, + proxies=self.proxy + ) + return response + except requests.exceptions.InvalidSchema as e: + print(f"Invalid URL during '{action}' for {url}:\n\t{e}") + return None + except requests.exceptions.ReadTimeout as e: + print(f"Timeout ({self.timeout}s) during '{action}' for {url}:\n\t{e}") + return None + except requests.exceptions.ConnectionError as e: + print(f"Connection error during '{action}' for {url}:\n\t{e}") + return None + + def get(self, url: str, params: dict[str, Any] | None = None) -> requests.Response | None: + """ + get data + + Args: + url (str): _description_ + params (dict | None): _description_ + + Returns: + requests.Response: _description_ + """ + return self.__call('get', url, params=params) + + def post( + self, url: str, data: dict[str, Any] | None = None, params: dict[str, Any] | None = None + ) -> requests.Response | None: + """ + post data + + Args: + url (str): _description_ + data (dict | None): _description_ + params (dict | None): _description_ + + Returns: + requests.Response | None: _description_ + """ + return self.__call('post', url, data, params) + + def put( + self, url: str, data: dict[str, Any] | None = None, params: dict[str, Any] | None = None + ) -> requests.Response | None: + """_summary_ + + Args: + url (str): _description_ + data (dict | None): _description_ + params (dict | None): _description_ + + Returns: + requests.Response | None: _description_ + """ + return self.__call('put', url, data, params) + + def patch( + self, url: str, data: dict[str, Any] | None = None, params: dict[str, Any] | None = None + ) -> requests.Response | None: + """_summary_ + + Args: + url (str): _description_ + data (dict | None): _description_ + params (dict | None): _description_ + + Returns: + requests.Response | None: _description_ + """ + return self.__call('patch', url, data, params) + + def delete(self, url: str, params: dict[str, Any] | None = None) -> requests.Response | None: + """ + delete + + Args: + url (str): _description_ + params (dict | None): _description_ + + Returns: + requests.Response | None: _description_ + """ + return self.__call('delete', url, params=params) + +# __END__ diff --git a/src/CoreLibs/stringHandling/__init__.py b/src/CoreLibs/stringHandling/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/CoreLibs/stringHandling/string_helpers.py b/src/CoreLibs/stringHandling/string_helpers.py new file mode 100644 index 0000000..29964c1 --- /dev/null +++ b/src/CoreLibs/stringHandling/string_helpers.py @@ -0,0 +1,34 @@ +""" +String helpers +""" + +from textwrap import shorten + + +def shorten_string(string: str, length: int, hard_shorten: bool = False, placeholder: str = " [~]") -> str: + """ + check if entry is too long and cut it, but only for console output + Note that if there are no spaces in the string, it will automatically use the hard split mode + + Args: + string (str): _description_ + length (int): _description_ + hard_shorten (bool): if shorte should be done on fixed string lenght. Default: False + placeholder (str): placeholder string. Default: " [~]" + + Returns: + str: _description_ + """ + length = int(length) + string = str(string) + if len(string) > length: + if hard_shorten is True or " " not in string: + short_string = f"{string[:(length - len(placeholder))]}{placeholder}" + else: + short_string = shorten(string, width=length, placeholder=placeholder) + else: + short_string = string + + return short_string + +# __END__ diff --git a/uv.lock b/uv.lock new file mode 100644 index 0000000..2b88f9e --- /dev/null +++ b/uv.lock @@ -0,0 +1,42 @@ +version = 1 +revision = 2 +requires-python = ">=3.13" + +[[package]] +name = "corelibs-python" +version = "0.1.0" +source = { virtual = "." } +dependencies = [ + { name = "jmespath" }, + { name = "psutil" }, +] + +[package.metadata] +requires-dist = [ + { name = "jmespath", specifier = ">=1.0.1" }, + { name = "psutil", specifier = ">=7.0.0" }, +] + +[[package]] +name = "jmespath" +version = "1.0.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/00/2a/e867e8531cf3e36b41201936b7fa7ba7b5702dbef42922193f05c8976cd6/jmespath-1.0.1.tar.gz", hash = "sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe", size = 25843, upload-time = "2022-06-17T18:00:12.224Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/31/b4/b9b800c45527aadd64d5b442f9b932b00648617eb5d63d2c7a6587b7cafc/jmespath-1.0.1-py3-none-any.whl", hash = "sha256:02e2e4cc71b5bcab88332eebf907519190dd9e6e82107fa7f83b1003a6252980", size = 20256, upload-time = "2022-06-17T18:00:10.251Z" }, +] + +[[package]] +name = "psutil" +version = "7.0.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/2a/80/336820c1ad9286a4ded7e845b2eccfcb27851ab8ac6abece774a6ff4d3de/psutil-7.0.0.tar.gz", hash = "sha256:7be9c3eba38beccb6495ea33afd982a44074b78f28c434a1f51cc07fd315c456", size = 497003, upload-time = "2025-02-13T21:54:07.946Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ed/e6/2d26234410f8b8abdbf891c9da62bee396583f713fb9f3325a4760875d22/psutil-7.0.0-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:101d71dc322e3cffd7cea0650b09b3d08b8e7c4109dd6809fe452dfd00e58b25", size = 238051, upload-time = "2025-02-13T21:54:12.36Z" }, + { url = "https://files.pythonhosted.org/packages/04/8b/30f930733afe425e3cbfc0e1468a30a18942350c1a8816acfade80c005c4/psutil-7.0.0-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:39db632f6bb862eeccf56660871433e111b6ea58f2caea825571951d4b6aa3da", size = 239535, upload-time = "2025-02-13T21:54:16.07Z" }, + { url = "https://files.pythonhosted.org/packages/2a/ed/d362e84620dd22876b55389248e522338ed1bf134a5edd3b8231d7207f6d/psutil-7.0.0-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1fcee592b4c6f146991ca55919ea3d1f8926497a713ed7faaf8225e174581e91", size = 275004, upload-time = "2025-02-13T21:54:18.662Z" }, + { url = "https://files.pythonhosted.org/packages/bf/b9/b0eb3f3cbcb734d930fdf839431606844a825b23eaf9a6ab371edac8162c/psutil-7.0.0-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4b1388a4f6875d7e2aff5c4ca1cc16c545ed41dd8bb596cefea80111db353a34", size = 277986, upload-time = "2025-02-13T21:54:21.811Z" }, + { url = "https://files.pythonhosted.org/packages/eb/a2/709e0fe2f093556c17fbafda93ac032257242cabcc7ff3369e2cb76a97aa/psutil-7.0.0-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a5f098451abc2828f7dc6b58d44b532b22f2088f4999a937557b603ce72b1993", size = 279544, upload-time = "2025-02-13T21:54:24.68Z" }, + { url = "https://files.pythonhosted.org/packages/50/e6/eecf58810b9d12e6427369784efe814a1eec0f492084ce8eb8f4d89d6d61/psutil-7.0.0-cp37-abi3-win32.whl", hash = "sha256:ba3fcef7523064a6c9da440fc4d6bd07da93ac726b5733c29027d7dc95b39d99", size = 241053, upload-time = "2025-02-13T21:54:34.31Z" }, + { url = "https://files.pythonhosted.org/packages/50/1b/6921afe68c74868b4c9fa424dad3be35b095e16687989ebbb50ce4fceb7c/psutil-7.0.0-cp37-abi3-win_amd64.whl", hash = "sha256:4cf3d4eb1aa9b348dec30105c55cd9b7d4629285735a102beb4441e38db90553", size = 244885, upload-time = "2025-02-13T21:54:37.486Z" }, +]