Fix base folder name, must be lower case
This commit is contained in:
0
src/corelibs/__init__.py
Normal file
0
src/corelibs/__init__.py
Normal file
0
src/corelibs/csv_handling/__init__.py
Normal file
0
src/corelibs/csv_handling/__init__.py
Normal file
91
src/corelibs/csv_handling/csv_writer.py
Normal file
91
src/corelibs/csv_handling/csv_writer.py
Normal file
@@ -0,0 +1,91 @@
|
||||
"""
|
||||
Write to CSV file
|
||||
- each class set is one file write with one header set
|
||||
"""
|
||||
|
||||
from typing import Any
|
||||
from pathlib import Path
|
||||
from collections import Counter
|
||||
import csv
|
||||
|
||||
|
||||
class CsvWriter:
|
||||
"""
|
||||
write to a CSV file
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
path: Path,
|
||||
file_name: str,
|
||||
header: dict[str, str],
|
||||
header_order: list[str] | None = None
|
||||
):
|
||||
self.path = path
|
||||
self.file_name = file_name
|
||||
# Key: index for write for the line dict, Values: header entries
|
||||
self.header = header
|
||||
self.csv_file_writer = self.__open_csv(header_order)
|
||||
|
||||
def __open_csv(self, header_order: list[str] | None) -> 'csv.DictWriter[str] | None':
|
||||
"""
|
||||
open csv file for writing, write headers
|
||||
|
||||
Note that if there is no header_order set we use the order in header dictionary
|
||||
|
||||
Arguments:
|
||||
line {list[str] | None} -- optional dedicated header order
|
||||
|
||||
Returns:
|
||||
csv.DictWriter[str] | None: _description_
|
||||
"""
|
||||
# if header order is set, make sure all header value fields exist
|
||||
header_values = self.header.values()
|
||||
if header_order is not None:
|
||||
if Counter(header_values) != Counter(header_order):
|
||||
print(
|
||||
"header order does not match header values: "
|
||||
f"{', '.join(header_values)} != {', '.join(header_order)}"
|
||||
)
|
||||
return None
|
||||
header_values = header_order
|
||||
# no duplicates
|
||||
if len(header_values) != len(set(header_values)):
|
||||
print(f"Header must have unique values only: {', '.join(header_values)}")
|
||||
return None
|
||||
try:
|
||||
fp = open(
|
||||
self.path.joinpath(self.file_name),
|
||||
"w", encoding="utf-8"
|
||||
)
|
||||
csv_file_writer = csv.DictWriter(
|
||||
fp,
|
||||
fieldnames=header_values,
|
||||
delimiter=",",
|
||||
quotechar='"',
|
||||
quoting=csv.QUOTE_MINIMAL,
|
||||
)
|
||||
csv_file_writer.writeheader()
|
||||
return csv_file_writer
|
||||
except OSError as err:
|
||||
print("OS error:", err)
|
||||
return None
|
||||
|
||||
def write_csv(self, line: dict[str, str]) -> bool:
|
||||
"""
|
||||
write member csv line
|
||||
|
||||
Arguments:
|
||||
line {dict[str, str]} -- _description_
|
||||
|
||||
Returns:
|
||||
bool -- _description_
|
||||
"""
|
||||
if self.csv_file_writer is None:
|
||||
return False
|
||||
csv_row: dict[str, Any] = {}
|
||||
# only write entries that are in the header list
|
||||
for key, value in self.header.items():
|
||||
csv_row[value] = line[key]
|
||||
self.csv_file_writer.writerow(csv_row)
|
||||
return True
|
||||
0
src/corelibs/debug_handling/__init__.py
Normal file
0
src/corelibs/debug_handling/__init__.py
Normal file
126
src/corelibs/debug_handling/profiling.py
Normal file
126
src/corelibs/debug_handling/profiling.py
Normal file
@@ -0,0 +1,126 @@
|
||||
"""
|
||||
Profile memory usage in Python
|
||||
"""
|
||||
|
||||
# https://docs.python.org/3/library/tracemalloc.html
|
||||
|
||||
import os
|
||||
import time
|
||||
import tracemalloc
|
||||
import linecache
|
||||
from typing import Tuple
|
||||
from tracemalloc import Snapshot
|
||||
import psutil
|
||||
|
||||
|
||||
def display_top(snapshot: Snapshot, key_type: str = 'lineno', limit: int = 10) -> str:
|
||||
"""
|
||||
Print tracmalloc stats
|
||||
https://docs.python.org/3/library/tracemalloc.html#pretty-top
|
||||
|
||||
Args:
|
||||
snapshot (Snapshot): _description_
|
||||
key_type (str, optional): _description_. Defaults to 'lineno'.
|
||||
limit (int, optional): _description_. Defaults to 10.
|
||||
"""
|
||||
snapshot = snapshot.filter_traces((
|
||||
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
|
||||
tracemalloc.Filter(False, "<unknown>"),
|
||||
))
|
||||
top_stats = snapshot.statistics(key_type)
|
||||
|
||||
profiler_msg = f"Top {limit} lines"
|
||||
for index, stat in enumerate(top_stats[:limit], 1):
|
||||
frame = stat.traceback[0]
|
||||
# replace "/path/to/module/file.py" with "module/file.py"
|
||||
filename = os.sep.join(frame.filename.split(os.sep)[-2:])
|
||||
profiler_msg += f"#{index}: {filename}:{frame.lineno}: {(stat.size / 1024):.1f} KiB"
|
||||
line = linecache.getline(frame.filename, frame.lineno).strip()
|
||||
if line:
|
||||
profiler_msg += f" {line}"
|
||||
|
||||
other = top_stats[limit:]
|
||||
if other:
|
||||
size = sum(stat.size for stat in other)
|
||||
profiler_msg += f"{len(other)} other: {(size / 1024):.1f} KiB"
|
||||
total = sum(stat.size for stat in top_stats)
|
||||
profiler_msg += f"Total allocated size: {(total / 1024):.1f} KiB"
|
||||
return profiler_msg
|
||||
|
||||
|
||||
class Profiling:
|
||||
"""
|
||||
Profile memory usage and elapsed time for some block
|
||||
Based on: https://stackoverflow.com/a/53301648
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# profiling id
|
||||
self.__ident: str = ''
|
||||
# memory
|
||||
self.__rss_before: int = 0
|
||||
self.__vms_before: int = 0
|
||||
# self.shared_before: int = 0
|
||||
self.__rss_used: int = 0
|
||||
self.__vms_used: int = 0
|
||||
# self.shared_used: int = 0
|
||||
# time
|
||||
self.__call_start: float = 0
|
||||
self.__elapsed = 0
|
||||
|
||||
def __get_process_memory(self) -> Tuple[int, int]:
|
||||
process = psutil.Process(os.getpid())
|
||||
mi = process.memory_info()
|
||||
# macos does not have mi.shared
|
||||
return mi.rss, mi.vms
|
||||
|
||||
def __elapsed_since(self) -> str:
|
||||
elapsed = time.time() - self.__call_start
|
||||
if elapsed < 1:
|
||||
return str(round(elapsed * 1000, 2)) + "ms"
|
||||
if elapsed < 60:
|
||||
return str(round(elapsed, 2)) + "s"
|
||||
if elapsed < 3600:
|
||||
return str(round(elapsed / 60, 2)) + "min"
|
||||
return str(round(elapsed / 3600, 2)) + "hrs"
|
||||
|
||||
def __format_bytes(self, bytes_data: int) -> str:
|
||||
if abs(bytes_data) < 1000:
|
||||
return str(bytes_data) + "B"
|
||||
if abs(bytes_data) < 1e6:
|
||||
return str(round(bytes_data / 1e3, 2)) + "kB"
|
||||
if abs(bytes_data) < 1e9:
|
||||
return str(round(bytes_data / 1e6, 2)) + "MB"
|
||||
return str(round(bytes_data / 1e9, 2)) + "GB"
|
||||
|
||||
def start_profiling(self, ident: str) -> None:
|
||||
"""
|
||||
start the profiling
|
||||
"""
|
||||
self.__ident = ident
|
||||
self.__rss_before, self.__vms_before = self.__get_process_memory()
|
||||
self.__call_start = time.time()
|
||||
|
||||
def end_profiling(self) -> None:
|
||||
"""
|
||||
end the profiling
|
||||
"""
|
||||
if self.__rss_before == 0 and self.__vms_before == 0:
|
||||
print("start_profile() was not called, output will be negative")
|
||||
self.__elapsed = self.__elapsed_since()
|
||||
__rss_after, __vms_after = self.__get_process_memory()
|
||||
self.__rss_used = __rss_after - self.__rss_before
|
||||
self.__vms_used = __vms_after - self.__vms_before
|
||||
|
||||
def print_profiling(self) -> str:
|
||||
"""
|
||||
print the profiling time
|
||||
"""
|
||||
return (
|
||||
f"Profiling: {self.__ident:>20} "
|
||||
f"RSS: {self.__format_bytes(self.__rss_used):>8} | "
|
||||
f"VMS: {self.__format_bytes(self.__vms_used):>8} | "
|
||||
f"time: {self.__elapsed:>8}"
|
||||
)
|
||||
|
||||
# __END__
|
||||
114
src/corelibs/debug_handling/timer.py
Normal file
114
src/corelibs/debug_handling/timer.py
Normal file
@@ -0,0 +1,114 @@
|
||||
"""
|
||||
a interval time class
|
||||
|
||||
Returns:
|
||||
Timer: class timer for basic time run calculations
|
||||
"""
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
|
||||
class Timer:
|
||||
"""
|
||||
get difference between start and end date/time
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
init new start time and set end time to None
|
||||
"""
|
||||
self._overall_start_time = datetime.now()
|
||||
self._overall_end_time = None
|
||||
self._overall_run_time = None
|
||||
self._start_time = datetime.now()
|
||||
self._end_time = None
|
||||
self._run_time = None
|
||||
|
||||
# MARK: overall run time
|
||||
def overall_run_time(self) -> timedelta:
|
||||
"""
|
||||
overall run time difference from class launch to call of this function
|
||||
|
||||
Returns:
|
||||
timedelta: _description_
|
||||
"""
|
||||
self._overall_end_time = datetime.now()
|
||||
self._overall_run_time = self._overall_end_time - self._overall_start_time
|
||||
return self._overall_run_time
|
||||
|
||||
def get_overall_start_time(self) -> datetime:
|
||||
"""
|
||||
get set start time
|
||||
|
||||
Returns:
|
||||
datetime: _description_
|
||||
"""
|
||||
return self._overall_start_time
|
||||
|
||||
def get_overall_end_time(self) -> datetime | None:
|
||||
"""
|
||||
get set end time or None for not set
|
||||
|
||||
Returns:
|
||||
datetime|None: _description_
|
||||
"""
|
||||
return self._overall_end_time
|
||||
|
||||
def get_overall_run_time(self) -> timedelta | None:
|
||||
"""
|
||||
get run time or None if run time was not called
|
||||
|
||||
Returns:
|
||||
datetime|None: _description_
|
||||
"""
|
||||
return self._overall_run_time
|
||||
|
||||
# MARK: set run time
|
||||
def run_time(self) -> timedelta:
|
||||
"""
|
||||
difference between start time and current time
|
||||
|
||||
Returns:
|
||||
datetime: _description_
|
||||
"""
|
||||
self._end_time = datetime.now()
|
||||
self._run_time = self._end_time - self._start_time
|
||||
return self._run_time
|
||||
|
||||
def reset_run_time(self):
|
||||
"""
|
||||
reset start/end and run tine
|
||||
"""
|
||||
self._start_time = datetime.now()
|
||||
self._end_time = None
|
||||
self._run_time = None
|
||||
|
||||
def get_start_time(self) -> datetime:
|
||||
"""
|
||||
get set start time
|
||||
|
||||
Returns:
|
||||
datetime: _description_
|
||||
"""
|
||||
return self._start_time
|
||||
|
||||
def get_end_time(self) -> datetime | None:
|
||||
"""
|
||||
get set end time or None for not set
|
||||
|
||||
Returns:
|
||||
datetime|None: _description_
|
||||
"""
|
||||
return self._end_time
|
||||
|
||||
def get_run_time(self) -> timedelta | None:
|
||||
"""
|
||||
get run time or None if run time was not called
|
||||
|
||||
Returns:
|
||||
datetime|None: _description_
|
||||
"""
|
||||
return self._run_time
|
||||
|
||||
|
||||
# __END__
|
||||
75
src/corelibs/debug_handling/writeline.py
Normal file
75
src/corelibs/debug_handling/writeline.py
Normal file
@@ -0,0 +1,75 @@
|
||||
"""
|
||||
Various small helpers for data writing
|
||||
"""
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
if TYPE_CHECKING:
|
||||
from io import TextIOWrapper
|
||||
|
||||
|
||||
def write_l(line: str, fpl: 'TextIOWrapper | None' = None, print_line: bool = False):
|
||||
"""
|
||||
Write a line to screen and to output file
|
||||
|
||||
Args:
|
||||
line (String): Line to write
|
||||
fpl (Resource): file handler resource, if none write only to console
|
||||
"""
|
||||
if print_line is True:
|
||||
print(line)
|
||||
if fpl is not None:
|
||||
fpl.write(line + "\n")
|
||||
|
||||
|
||||
# progress printers
|
||||
|
||||
def pr_header(tag: str, marker_string: str = '#', width: int = 35):
|
||||
"""_summary_
|
||||
|
||||
Args:
|
||||
tag (str): _description_
|
||||
"""
|
||||
print(f" {marker_string} {tag:^{width}} {marker_string}")
|
||||
|
||||
|
||||
def pr_title(tag: str, prefix_string: str = '|', space_filler: str = '.', width: int = 35):
|
||||
"""_summary_
|
||||
|
||||
Args:
|
||||
tag (str): _description_
|
||||
prefix_string (str, optional): _description_. Defaults to '|'.
|
||||
"""
|
||||
print(f" {prefix_string} {tag:{space_filler}<{width}}:", flush=True)
|
||||
|
||||
|
||||
def pr_open(tag: str, prefix_string: str = '|', space_filler: str = '.', width: int = 35):
|
||||
"""
|
||||
writen progress open line with tag
|
||||
|
||||
Args:
|
||||
tag (str): _description_
|
||||
prefix_string (str): prefix string. Default: '|'
|
||||
"""
|
||||
print(f" {prefix_string} {tag:{space_filler}<{width}} [", end="", flush=True)
|
||||
|
||||
|
||||
def pr_close(tag: str = ''):
|
||||
"""
|
||||
write the close tag with new line
|
||||
|
||||
Args:
|
||||
tag (str, optional): _description_. Defaults to ''.
|
||||
"""
|
||||
print(f"{tag}]", flush=True)
|
||||
|
||||
|
||||
def pr_act(act: str = "."):
|
||||
"""
|
||||
write progress character
|
||||
|
||||
Args:
|
||||
act (str, optional): _description_. Defaults to ".".
|
||||
"""
|
||||
print(f"{act}", end="", flush=True)
|
||||
|
||||
# __EMD__
|
||||
0
src/corelibs/file_handling/__init__.py
Normal file
0
src/corelibs/file_handling/__init__.py
Normal file
44
src/corelibs/file_handling/file_crc.py
Normal file
44
src/corelibs/file_handling/file_crc.py
Normal file
@@ -0,0 +1,44 @@
|
||||
"""
|
||||
crc handlers for file CRC
|
||||
"""
|
||||
|
||||
import zlib
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def file_crc(file_path: Path) -> str:
|
||||
"""
|
||||
With for loop and buffer, create file crc32
|
||||
|
||||
Args:
|
||||
file_path (str | Path): _description_
|
||||
|
||||
Returns:
|
||||
str: file crc32
|
||||
"""
|
||||
crc = 0
|
||||
with open(file_path, 'rb', 65536) as ins:
|
||||
for _ in range(int((file_path.stat().st_size / 65536)) + 1):
|
||||
crc = zlib.crc32(ins.read(65536), crc)
|
||||
return f"{crc & 0xFFFFFFFF:08X}"
|
||||
|
||||
|
||||
def file_name_crc(file_path: Path, add_parent_folder: bool = False) -> str:
|
||||
"""
|
||||
either returns file name only from path
|
||||
eg: /foo/bar/baz/file.csv will be file.csv
|
||||
or
|
||||
return the first parent path from path + file
|
||||
eg: /foo/bar/baz/file.csv will be baz/file.csv
|
||||
|
||||
Args:
|
||||
file_path (Path): _description_
|
||||
add_parent_folder (bool, optional): _description_. Defaults to False.
|
||||
|
||||
Returns:
|
||||
str: file name as string
|
||||
"""
|
||||
if add_parent_folder:
|
||||
return str(Path(file_path.parent.name).joinpath(file_path.name))
|
||||
else:
|
||||
return file_path.name
|
||||
46
src/corelibs/file_handling/file_handling.py
Normal file
46
src/corelibs/file_handling/file_handling.py
Normal file
@@ -0,0 +1,46 @@
|
||||
"""
|
||||
File handling utilities
|
||||
"""
|
||||
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def remove_all_in_directory(directory: Path, ignore_files: list[str] | None = None, verbose: bool = False) -> bool:
|
||||
"""
|
||||
remove all files and folders in a directory
|
||||
can exclude files or folders
|
||||
|
||||
Args:
|
||||
directory (Path): _description_
|
||||
ignore_files (list[str], optional): _description_. Defaults to None.
|
||||
|
||||
Returns:
|
||||
bool: _description_
|
||||
"""
|
||||
if not directory.is_dir():
|
||||
return False
|
||||
if ignore_files is None:
|
||||
ignore_files = []
|
||||
if verbose:
|
||||
print(f"Remove old files in: {directory.name} [", end="", flush=True)
|
||||
# remove all files and folders in given directory by recursive globbing
|
||||
for file in directory.rglob("*"):
|
||||
# skip if in ignore files
|
||||
if file.name in ignore_files:
|
||||
continue
|
||||
# remove one file, or a whole directory
|
||||
if file.is_file():
|
||||
os.remove(file)
|
||||
if verbose:
|
||||
print(".", end="", flush=True)
|
||||
elif file.is_dir():
|
||||
shutil.rmtree(file)
|
||||
if verbose:
|
||||
print("/", end="", flush=True)
|
||||
if verbose:
|
||||
print("]", flush=True)
|
||||
return True
|
||||
|
||||
# __END__
|
||||
477
src/corelibs/file_handling/progress.py
Normal file
477
src/corelibs/file_handling/progress.py
Normal file
@@ -0,0 +1,477 @@
|
||||
"""
|
||||
AUTHOR: Clemens Schwaighofer
|
||||
DATE CREATED: 2009/7/24 (2025/7/2)
|
||||
DESCRIPTION: progress percent class (perl -> python)
|
||||
|
||||
HOW TO USE
|
||||
* load
|
||||
from progress import Progress
|
||||
* init
|
||||
prg = Progress()
|
||||
allowed parameters to pass are (in order)
|
||||
- verbose (0/1/...) : show output
|
||||
- precision (-2~10) : -2 (5%), -1 (10%), 0 (normal 0-100%), 1~10 (100.m~%)
|
||||
- microtime (1/0/-1) : show microtime in eta/run time
|
||||
- wide time (bool) : padd time so all time column doesn't change width of line
|
||||
- prefix line break (bool): add line break before string and not only after
|
||||
prg = Progress(verbose = 1, precision = 2)
|
||||
* settings methods
|
||||
set_wide_time(bool)
|
||||
set_microtime(int -1/0/1)
|
||||
set_prefix_lb(bool)
|
||||
set_verbose(0/1 int)
|
||||
set_precision(-2~10 int)
|
||||
set_linecount(int)
|
||||
set_filesize(int)
|
||||
set_start_time(time optional)
|
||||
set_eta_start_time(time optional)
|
||||
set_end_time(time optional)
|
||||
show_position(file pos optional)
|
||||
"""
|
||||
|
||||
import time
|
||||
from typing import Literal
|
||||
from math import floor
|
||||
from corelibs.string_handling.datetime_helpers import convert_timestamp
|
||||
from corelibs.string_handling.byte_helpers import format_bytes
|
||||
|
||||
|
||||
class Progress():
|
||||
"""
|
||||
file progress output information
|
||||
"""
|
||||
def __init__(
|
||||
self,
|
||||
verbose: int = 0,
|
||||
precision: int = 1,
|
||||
microtime: Literal[-1] | Literal[1] | Literal[0] = 0,
|
||||
wide_time: bool = False,
|
||||
prefix_lb: bool = False
|
||||
):
|
||||
# set default var stuff
|
||||
# max lines in input
|
||||
self.linecount: int = 0
|
||||
# max file size
|
||||
self.filesize: int = 0
|
||||
# * comma after percent
|
||||
self.precision: int = 0
|
||||
# * if flagged 1, then wthe wide 15 char left bound format is used
|
||||
self.wide_time: bool = False
|
||||
# * verbose status from outside
|
||||
self.verbose: bool = False
|
||||
# * microtime output for last run time (1 for enable 0 for auto -1 for disable)
|
||||
self.microtime: Literal[-1] | Literal[1] | Literal[0] = 0
|
||||
# micro time flag for last group
|
||||
self.lg_microtime: bool = False
|
||||
# = flag if output was given
|
||||
self.change = 0
|
||||
# = global start for the full script running time
|
||||
self.start: float | None = None
|
||||
# = for the eta time, can be set after a query or long read in, to not create a wrong ETA time
|
||||
self.start_run: float | None = None
|
||||
# loop start
|
||||
self.start_time: float | None = None
|
||||
# global end
|
||||
self.end: float | None = None
|
||||
# loop end
|
||||
self.end_time: float | None = None
|
||||
# run time in seconds, set when end time method is called
|
||||
self.run_time: float | None = None
|
||||
# = filesize current
|
||||
self.count_size: int | None = None
|
||||
# position current
|
||||
self.count: int = 0
|
||||
# last count (position)
|
||||
self.current_count: int = 0
|
||||
# the current file post
|
||||
self.file_pos: int | None = None
|
||||
# lines processed in the last run
|
||||
self.lines_processed: int = 0
|
||||
# time in th seconds for the last group run (until percent change)
|
||||
self.last_group: float = 0
|
||||
# float value, lines processed per second to the last group run
|
||||
self.lines_in_last_group: float = 0
|
||||
# float values, lines processed per second to complete run
|
||||
self.lines_in_global: float = 0
|
||||
# flaot value, bytes processes per second in the last group run
|
||||
self.bytes_in_last_group: float = 0
|
||||
# float value, bytes processed per second to complete run
|
||||
self.bytes_in_global: float = 0
|
||||
# bytes processed in last run (in bytes)
|
||||
self.size_in_last_group: int = 0
|
||||
# current file position 8size)
|
||||
self.current_size: int = 0
|
||||
# last percent position
|
||||
self.last_percent: int | float = 0
|
||||
# if we have normal % or in steps of 10
|
||||
self.precision_ten_step: int = 0
|
||||
# the default size this is precision + 4
|
||||
self.percent_print: int = 5
|
||||
# this is 1 if it is 1 or 0 for precision or precision size
|
||||
self.percent_precision: int = 1
|
||||
# prefix line with a line break
|
||||
self.prefix_lb: bool = False
|
||||
# estimated time to finish
|
||||
self.eta: float | None = None
|
||||
# run time since start
|
||||
self.full_time_needed: float | None = None
|
||||
# the actual output
|
||||
self.string: str = ''
|
||||
|
||||
# initialize the class
|
||||
self.set_precision(precision)
|
||||
self.set_verbose(verbose)
|
||||
self.set_micro_time(microtime)
|
||||
self.set_wide_time(wide_time)
|
||||
self.set_prefix_lb(prefix_lb)
|
||||
self.set_start_time()
|
||||
|
||||
def reset(self):
|
||||
"""
|
||||
resets the current progress to 0, but keeps the overall start variables set
|
||||
"""
|
||||
# reset what always gets reset
|
||||
self.count = 0
|
||||
self.count_size = None
|
||||
self.current_count = 0
|
||||
self.linecount = 0
|
||||
self.lines_processed = 0
|
||||
self.last_group = 0
|
||||
self.lines_in_last_group = 0
|
||||
self.lines_in_global = 0
|
||||
self.bytes_in_last_group = 0
|
||||
self.bytes_in_global = 0
|
||||
self.size_in_last_group = 0
|
||||
self.filesize = 0
|
||||
self.current_size = 0
|
||||
self.last_percent = 0
|
||||
self.eta = 0
|
||||
self.full_time_needed = 0
|
||||
self.start_run = None
|
||||
self.start_time = None
|
||||
self.end_time = None
|
||||
|
||||
def set_wide_time(self, wide_time: bool) -> bool:
|
||||
"""
|
||||
sets the show wide time flag
|
||||
|
||||
Arguments:
|
||||
wide_time {bool} -- _description_
|
||||
|
||||
Returns:
|
||||
bool -- _description_
|
||||
"""
|
||||
self.wide_time = wide_time
|
||||
return self.wide_time
|
||||
|
||||
def set_micro_time(self, microtime: Literal[-1] | Literal[1] | Literal[0]) -> Literal[-1] | Literal[1] | Literal[0]:
|
||||
"""sets the show microtime -1 OFF, 0 AUTO, 1 ON
|
||||
|
||||
Returns:
|
||||
_type_ -- _description_
|
||||
"""
|
||||
self.microtime = microtime
|
||||
return self.microtime
|
||||
|
||||
def set_prefix_lb(self, prefix_lb: bool) -> bool:
|
||||
"""
|
||||
set prefix line break flag
|
||||
|
||||
Arguments:
|
||||
prefix_lb {bool} -- _description_
|
||||
|
||||
Returns:
|
||||
bool -- _description_
|
||||
"""
|
||||
self.prefix_lb = prefix_lb
|
||||
return self.prefix_lb
|
||||
|
||||
def set_verbose(self, verbose: int) -> bool:
|
||||
"""
|
||||
set the internal verbose flag to 1 if any value higher than 1 is given, else sets it to 0
|
||||
|
||||
Arguments:
|
||||
verbose {int} -- _description_
|
||||
|
||||
Returns:
|
||||
bool -- _description_
|
||||
"""
|
||||
if verbose > 0:
|
||||
self.verbose = True
|
||||
else:
|
||||
self.verbose = False
|
||||
return self.verbose
|
||||
|
||||
def set_precision(self, precision: int) -> int:
|
||||
"""
|
||||
sets the output precision size. If -2 for five step, -1 for ten step
|
||||
else sets the precision normally, for 0, no precision is set, maximum precision is 10
|
||||
|
||||
Arguments:
|
||||
precision {int} -- _description_
|
||||
|
||||
Returns:
|
||||
int -- _description_
|
||||
"""
|
||||
# if not a valid number, we set it to 0
|
||||
if precision < -2 or precision > 10:
|
||||
precision = 0
|
||||
if precision < 0:
|
||||
if precision < -1:
|
||||
self.precision_ten_step = 5
|
||||
else:
|
||||
self.precision_ten_step = 10
|
||||
self.precision = 0 # no comma
|
||||
self.percent_precision = 0 # no print precision
|
||||
self.percent_print = 3 # max 3 length
|
||||
else:
|
||||
# comma values visible
|
||||
self.precision = 10 if precision < 0 or precision > 10 else precision
|
||||
# for calcualtion of precision
|
||||
self.percent_precision = 10 if precision < 0 or precision > 10 else precision
|
||||
# for the format output base is 4, plsut he percent precision length
|
||||
self.percent_print = (3 if precision == 0 else 4) + self.percent_precision
|
||||
# return the set precision
|
||||
return self.precision
|
||||
|
||||
def set_linecount(self, linecount: int) -> int:
|
||||
"""
|
||||
set the maximum lines in this file, if value is smaller than 0 or 0, then it is set to 1
|
||||
|
||||
Arguments:
|
||||
linecount {int} -- _description_
|
||||
|
||||
Returns:
|
||||
int -- _description_
|
||||
"""
|
||||
if linecount > 0:
|
||||
self.linecount = linecount
|
||||
else:
|
||||
self.linecount = 1
|
||||
return self.linecount
|
||||
|
||||
def set_filesize(self, filesize: int) -> int:
|
||||
"""
|
||||
set the maximum filesize for this file, if value is smaller than 0 or 0, then it is set to 1
|
||||
|
||||
Arguments:
|
||||
filesize {int} -- _description_
|
||||
|
||||
Returns:
|
||||
int -- _description_
|
||||
"""
|
||||
if filesize > 0:
|
||||
self.filesize = filesize
|
||||
else:
|
||||
self.filesize = 1
|
||||
return self.filesize
|
||||
|
||||
def set_start_time(self, time_value: float = time.time()) -> None:
|
||||
"""
|
||||
initial set of the start times, auto set
|
||||
|
||||
Keyword Arguments:
|
||||
time_value {float} -- _description_ (default: {time.time()})
|
||||
"""
|
||||
# avoid possible double set of the original start time
|
||||
if not self.start:
|
||||
self.start = time_value
|
||||
self.start_time = time_value
|
||||
self.start_run = time_value
|
||||
|
||||
def set_eta_start_time(self, time_value: float = time.time()) -> None:
|
||||
"""
|
||||
sets the loop % run time, for correct ETA calculation
|
||||
calls set start time, as the main start time is only set once
|
||||
|
||||
Keyword Arguments:
|
||||
time_value {float} -- _description_ (default: {time.time()})
|
||||
"""
|
||||
self.set_start_time(time_value)
|
||||
|
||||
def set_end_time(self, time_value: float = time.time()) -> None:
|
||||
"""
|
||||
set the end time
|
||||
|
||||
Keyword Arguments:
|
||||
time_value {float} -- _description_ (default: {time.time()})
|
||||
"""
|
||||
self.end = time_value
|
||||
self.end_time = time_value
|
||||
if self.start is None:
|
||||
self.start = 0
|
||||
# the overall run time in micro seconds
|
||||
self.run_time = self.end - self.start
|
||||
|
||||
def show_position(self, filepos: int = 0) -> str:
|
||||
"""
|
||||
processes the current position. either based on read the file size pos, or the line count
|
||||
|
||||
Keyword Arguments:
|
||||
filepos {int} -- _description_ (default: {0})
|
||||
|
||||
Returns:
|
||||
str -- _description_
|
||||
"""
|
||||
show_filesize = True # if we print from file size or line count
|
||||
# microtime flags
|
||||
eta_microtime = False
|
||||
ftn_microtime = False
|
||||
lg_microtime = False
|
||||
# percent precision calc
|
||||
# _p_spf = "{:." + str(self.precision) + "f}"
|
||||
# output format for percent
|
||||
_pr_p_spf = "{:>" + str(self.percent_print) + "." + str(self.percent_precision) + "f}"
|
||||
# set the linecount precision based on the final linecount, if not, leave it empty
|
||||
_pr_lc = "{}"
|
||||
if self.linecount:
|
||||
_pr_lc = "{:>" + str(len(str(f"{self.linecount:,}"))) + ",}"
|
||||
# time format, if flag is set, the wide format is used
|
||||
_pr_tf = "{}"
|
||||
if self.wide_time:
|
||||
_pr_tf = "{:>15}"
|
||||
|
||||
# count up
|
||||
self.count += 1
|
||||
# if we have file pos from parameter
|
||||
if filepos != 0:
|
||||
self.file_pos = filepos
|
||||
else:
|
||||
# we did not, so we set internal value
|
||||
self.file_pos = self.count
|
||||
# we also check if the filesize was set now
|
||||
if self.filesize == 0:
|
||||
self.filesize = self.linecount
|
||||
# set ignore filesize output (no data)
|
||||
show_filesize = False
|
||||
# set the count size based on the file pos, is only used if we have filesize
|
||||
self.count_size = self.file_pos
|
||||
# do normal or down to 10 (0, 10, ...) %
|
||||
if self.precision_ten_step:
|
||||
_percent = int((self.file_pos / float(self.filesize)) * 100)
|
||||
mod = _percent % self.precision_ten_step
|
||||
percent = _percent if mod == 0 else self.last_percent
|
||||
else:
|
||||
# calc percent
|
||||
percent = round(((self.file_pos / float(self.filesize)) * 100), self.precision)
|
||||
|
||||
# output
|
||||
if percent != self.last_percent:
|
||||
self.end_time = time.time() # current time (for loop time)
|
||||
if self.start is None:
|
||||
self.start = 0
|
||||
if self.start_time is None:
|
||||
self.start_time = 0
|
||||
# for from the beginning
|
||||
full_time_needed = self.end_time - self.start # how long from the start
|
||||
self.last_group = self.end_time - self.start_time # how long for last loop
|
||||
self.lines_processed = self.count - self.current_count # how many lines processed
|
||||
# lines in last group
|
||||
self.lines_in_last_group = (self.lines_processed / self.last_group) if self.last_group else 0
|
||||
# lines in global
|
||||
self.lines_in_global = (self.count / full_time_needed) if full_time_needed else 0
|
||||
# if we have linecount or not
|
||||
if self.linecount == 0:
|
||||
full_time_per_line = (full_time_needed if full_time_needed else 1) / self.count_size # how long for all
|
||||
# estimate for the rest
|
||||
eta = full_time_per_line * (self.filesize - self.count_size)
|
||||
else:
|
||||
# how long for all
|
||||
full_time_per_line = (full_time_needed if full_time_needed else 1) / self.count
|
||||
# estimate for the rest
|
||||
eta = full_time_per_line * (self.linecount - self.count)
|
||||
# just in case ...
|
||||
if eta < 0:
|
||||
eta = 0
|
||||
# check if to show microtime
|
||||
# ON
|
||||
if self.microtime == 1:
|
||||
eta_microtime = ftn_microtime = lg_microtime = True
|
||||
# AUTO
|
||||
if self.microtime == 0:
|
||||
if eta > 0 and eta < 1:
|
||||
eta_microtime = True
|
||||
if full_time_needed > 0 and full_time_needed < 1:
|
||||
ftn_microtime = True
|
||||
# pre check last group: if pre comma part is same add microtime anyway
|
||||
if self.last_group > 0 and self.last_group < 1:
|
||||
lg_microtime = True
|
||||
if self.last_group == floor(self.last_group):
|
||||
lg_microtime = True
|
||||
self.last_group = floor(self.last_group)
|
||||
# if with filesize or without
|
||||
if show_filesize:
|
||||
# last group size
|
||||
self.size_in_last_group = self.count_size - self.current_size
|
||||
# calc kb/s if there is any filesize data
|
||||
# last group
|
||||
self.bytes_in_last_group = (self.size_in_last_group / self.last_group) if self.last_group else 0
|
||||
# global
|
||||
self.bytes_in_global = (self.count_size / full_time_needed) if full_time_needed else 0
|
||||
# only used if we run with file size for the next check
|
||||
self.current_size = self.count_size
|
||||
|
||||
if self.verbose >= 1:
|
||||
self.string = (
|
||||
f"Processed {_pr_p_spf}% "
|
||||
"[{} / {}] | "
|
||||
f"{_pr_lc} / {_pr_lc} Lines | ETA: {_pr_tf} / TR: {_pr_tf} / "
|
||||
"LR: {:,} "
|
||||
"lines ({:,}) in {}, {:,.2f} ({:,.2f}) lines/s, {} ({}) b/s"
|
||||
).format(
|
||||
float(percent),
|
||||
format_bytes(self.count_size),
|
||||
format_bytes(self.filesize),
|
||||
self.count,
|
||||
self.linecount,
|
||||
convert_timestamp(eta, eta_microtime),
|
||||
convert_timestamp(full_time_needed, ftn_microtime),
|
||||
self.lines_processed,
|
||||
self.size_in_last_group,
|
||||
convert_timestamp(self.last_group, lg_microtime),
|
||||
self.lines_in_global,
|
||||
self.lines_in_last_group,
|
||||
format_bytes(self.bytes_in_global),
|
||||
format_bytes(self.bytes_in_last_group)
|
||||
)
|
||||
else:
|
||||
if self.verbose >= 1:
|
||||
self.string = (
|
||||
f"Processed {_pr_p_spf}% | {_pr_lc} / {_pr_lc} Lines "
|
||||
f"| ETA: {_pr_tf} / TR: {_pr_tf} / "
|
||||
"LR: {:,} lines in {}, {:,.2f} ({:,.2f}) lines/s"
|
||||
).format(
|
||||
float(percent),
|
||||
self.count,
|
||||
self.linecount,
|
||||
convert_timestamp(eta, eta_microtime),
|
||||
convert_timestamp(full_time_needed, ftn_microtime),
|
||||
self.lines_processed,
|
||||
convert_timestamp(self.last_group, lg_microtime),
|
||||
self.lines_in_global,
|
||||
self.lines_in_last_group
|
||||
)
|
||||
# prefix return string with line break if flagged
|
||||
self.string = ("\n" if self.prefix_lb else '') + self.string
|
||||
# print the string if verbose is turned on
|
||||
if self.verbose >= 1:
|
||||
print(self.string)
|
||||
|
||||
# write back vars
|
||||
self.last_percent = percent
|
||||
self.eta = eta
|
||||
self.full_time_needed = full_time_needed
|
||||
self.lg_microtime = lg_microtime
|
||||
# for the next run, check data
|
||||
self.start_time = time.time()
|
||||
self.current_count = self.count
|
||||
# trigger if this is a change
|
||||
self.change = 1
|
||||
else:
|
||||
# trigger if this is a change
|
||||
self.change = 0
|
||||
# return string
|
||||
return self.string
|
||||
# } END OF ShowPosition
|
||||
|
||||
# __END__
|
||||
0
src/corelibs/json_handling/__init__.py
Normal file
0
src/corelibs/json_handling/__init__.py
Normal file
35
src/corelibs/json_handling/jmespath_helper.py
Normal file
35
src/corelibs/json_handling/jmespath_helper.py
Normal file
@@ -0,0 +1,35 @@
|
||||
"""
|
||||
helper functions for jmespath interfaces
|
||||
"""
|
||||
|
||||
from typing import Any
|
||||
import jmespath
|
||||
import jmespath.exceptions
|
||||
|
||||
|
||||
def jmespath_search(search_data: dict[Any, Any] | list[Any], search_params: str) -> Any:
|
||||
"""
|
||||
jmespath search wrapper
|
||||
|
||||
Args:
|
||||
search_data (dict | list): _description_
|
||||
search_params (str): _description_
|
||||
|
||||
Raises:
|
||||
ValueError: jmespath.exceptions.LexerError
|
||||
ValueError: jmespath.exceptions.ParseError
|
||||
|
||||
Returns:
|
||||
Any: dict/list/etc, None if nothing found
|
||||
"""
|
||||
try:
|
||||
search_result = jmespath.search(search_params, search_data)
|
||||
except jmespath.exceptions.LexerError as excp:
|
||||
raise ValueError(f"Compile failed: {search_params}: {excp}") from excp
|
||||
except jmespath.exceptions.ParseError as excp:
|
||||
raise ValueError(f"Parse failed: {search_params}: {excp}") from excp
|
||||
except TypeError as excp:
|
||||
raise ValueError(f"Type error for search_params: {excp}") from excp
|
||||
return search_result
|
||||
|
||||
# __END__
|
||||
31
src/corelibs/json_handling/json_helper.py
Normal file
31
src/corelibs/json_handling/json_helper.py
Normal file
@@ -0,0 +1,31 @@
|
||||
"""
|
||||
json encoder for datetime
|
||||
"""
|
||||
|
||||
from typing import Any
|
||||
from json import JSONEncoder
|
||||
from datetime import datetime, date
|
||||
|
||||
|
||||
# subclass JSONEncoder
|
||||
class DateTimeEncoder(JSONEncoder):
|
||||
"""
|
||||
Override the default method
|
||||
cls=DateTimeEncoder
|
||||
"""
|
||||
def default(self, o: Any) -> str | None:
|
||||
if isinstance(o, (date, datetime)):
|
||||
return o.isoformat()
|
||||
return None
|
||||
|
||||
|
||||
def default(obj: Any) -> str | None:
|
||||
"""
|
||||
default override
|
||||
default=default
|
||||
"""
|
||||
if isinstance(obj, (date, datetime)):
|
||||
return obj.isoformat()
|
||||
return None
|
||||
|
||||
# __END__
|
||||
0
src/corelibs/list_dict_handling/__init__.py
Normal file
0
src/corelibs/list_dict_handling/__init__.py
Normal file
128
src/corelibs/list_dict_handling/data_search.py
Normal file
128
src/corelibs/list_dict_handling/data_search.py
Normal file
@@ -0,0 +1,128 @@
|
||||
"""
|
||||
wrapper around search path
|
||||
"""
|
||||
|
||||
from typing import Any
|
||||
|
||||
|
||||
def array_search(
|
||||
search_params: list[dict[str, str | bool | list[str | None]]],
|
||||
data: list[dict[str, Any]],
|
||||
return_index: bool = False
|
||||
) -> list[dict[str, Any]]:
|
||||
"""
|
||||
search in an array of dicts with an array of Key/Value set
|
||||
all Key/Value sets must match
|
||||
Value set can be list for OR match
|
||||
option: case_senstive: default True
|
||||
|
||||
Args:
|
||||
search_params (list): List of search params in "Key"/"Value" lists with options
|
||||
data (list): data to search in, must be a list
|
||||
return_index (bool): return index of list [default False]
|
||||
|
||||
Raises:
|
||||
ValueError: if search params is not a list
|
||||
KeyError: if Key or Value are missing in search params
|
||||
KeyError: More than one Key with the same name set
|
||||
|
||||
Returns:
|
||||
list: list of found elements, or if return index
|
||||
list of dics with "index" and "data", where "data" holds the result list
|
||||
"""
|
||||
if not isinstance(search_params, list): # type: ignore
|
||||
raise ValueError("search_params must be a list")
|
||||
keys = []
|
||||
for search in search_params:
|
||||
if not search.get('Key') or not search.get('Value'):
|
||||
raise KeyError(
|
||||
f"Either Key '{search.get('Key', '')}' or "
|
||||
f"Value '{search.get('Value', '')}' is missing or empty"
|
||||
)
|
||||
# if double key -> abort
|
||||
if search.get("Key") in keys:
|
||||
raise KeyError(
|
||||
f"Key {search.get('Key', '')} already exists in search_params"
|
||||
)
|
||||
|
||||
return_items: list[dict[str, Any]] = []
|
||||
for si_idx, search_item in enumerate(data):
|
||||
# for each search entry, all must match
|
||||
matching = 0
|
||||
for search in search_params:
|
||||
# either Value direct or if Value is list then any of those items can match
|
||||
# values are compared in lower case if case senstive is off
|
||||
# lower case left side
|
||||
# TODO: allow nested Keys. eg "Key: ["Key a", "key b"]" to be ["Key a"]["key b"]
|
||||
if search.get("case_sensitive", True) is False:
|
||||
search_value = search_item.get(str(search['Key']), "").lower()
|
||||
else:
|
||||
search_value = search_item.get(str(search['Key']), "")
|
||||
# lower case right side
|
||||
if isinstance(search['Value'], list):
|
||||
search_in = [
|
||||
str(k).lower()
|
||||
if search.get("case_sensitive", True) is False else k
|
||||
for k in search['Value']
|
||||
]
|
||||
elif search.get("case_sensitive", True) is False:
|
||||
search_in = str(search['Value']).lower()
|
||||
else:
|
||||
search_in = search['Value']
|
||||
# compare check
|
||||
if (
|
||||
(
|
||||
isinstance(search_in, list) and
|
||||
search_value in search_in
|
||||
) or
|
||||
search_value == search_in
|
||||
):
|
||||
matching += 1
|
||||
if len(search_params) == matching:
|
||||
if return_index is True:
|
||||
# the data is now in "data sub set"
|
||||
return_items.append({
|
||||
"index": si_idx,
|
||||
"data": search_item
|
||||
})
|
||||
else:
|
||||
return_items.append(search_item)
|
||||
# return all found or empty list
|
||||
return return_items
|
||||
|
||||
|
||||
def key_lookup(haystack: dict[str, str], key: str) -> str:
|
||||
"""
|
||||
simple key lookup in haystack, erturns empty string if not found
|
||||
|
||||
Args:
|
||||
haystack (dict[str, str]): _description_
|
||||
key (str): _description_
|
||||
|
||||
Returns:
|
||||
str: _description_
|
||||
"""
|
||||
return haystack.get(key, "")
|
||||
|
||||
|
||||
def value_lookup(haystack: dict[str, str], value: str, raise_on_many: bool = False) -> str:
|
||||
"""
|
||||
find by value, if not found returns empty, if not raise on many returns the first one
|
||||
|
||||
Args:
|
||||
haystack (dict[str, str]): _description_
|
||||
value (str): _description_
|
||||
raise_on_many (bool, optional): _description_. Defaults to False.
|
||||
|
||||
Raises:
|
||||
ValueError: _description_
|
||||
|
||||
Returns:
|
||||
str: _description_
|
||||
"""
|
||||
keys = [__key for __key, __value in haystack.items() if __value == value]
|
||||
if not keys:
|
||||
return ""
|
||||
if raise_on_many is True and len(keys) > 1:
|
||||
raise ValueError("More than one element found with the same name")
|
||||
return keys[0]
|
||||
21
src/corelibs/list_dict_handling/dump_data.py
Normal file
21
src/corelibs/list_dict_handling/dump_data.py
Normal file
@@ -0,0 +1,21 @@
|
||||
"""
|
||||
dict dump as JSON formatted
|
||||
"""
|
||||
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
|
||||
def dump_data(data: dict[Any, Any] | list[Any] | str | None) -> str:
|
||||
"""
|
||||
dump formated output from dict/list
|
||||
|
||||
Args:
|
||||
data (dict | list | str): _description_
|
||||
|
||||
Returns:
|
||||
str: _description_
|
||||
"""
|
||||
return json.dumps(data, indent=4, ensure_ascii=False, default=str)
|
||||
|
||||
# __END__
|
||||
37
src/corelibs/list_dict_handling/fingerprint.py
Normal file
37
src/corelibs/list_dict_handling/fingerprint.py
Normal file
@@ -0,0 +1,37 @@
|
||||
"""
|
||||
Various dictionary, object and list hashers
|
||||
"""
|
||||
|
||||
import json
|
||||
import hashlib
|
||||
from typing import Any
|
||||
|
||||
|
||||
def dict_hash_frozen(data: dict[Any, Any]) -> int:
|
||||
"""
|
||||
hash a dict via freeze
|
||||
|
||||
Args:
|
||||
data (dict): _description_
|
||||
|
||||
Returns:
|
||||
str: _description_
|
||||
"""
|
||||
return hash(frozenset(data.items()))
|
||||
|
||||
|
||||
def dict_hash_crc(data: dict[Any, Any] | list[Any]) -> str:
|
||||
"""
|
||||
Create a sha256 hash over dict
|
||||
alternative for
|
||||
dict_hash_frozen
|
||||
|
||||
Args:
|
||||
data (dict | list): _description_
|
||||
|
||||
Returns:
|
||||
str: _description_
|
||||
"""
|
||||
return hashlib.sha256(
|
||||
json.dumps(data, sort_keys=True, ensure_ascii=True).encode('utf-8')
|
||||
).hexdigest()
|
||||
61
src/corelibs/list_dict_handling/manage_dict.py
Normal file
61
src/corelibs/list_dict_handling/manage_dict.py
Normal file
@@ -0,0 +1,61 @@
|
||||
"""
|
||||
Various helper functions for type data clean up
|
||||
"""
|
||||
|
||||
from typing import Any, cast
|
||||
|
||||
|
||||
def delete_keys_from_set(
|
||||
set_data: dict[str, Any] | list[Any] | str, keys: list[str]
|
||||
) -> dict[str, Any] | list[Any] | Any:
|
||||
"""
|
||||
remove all keys from set_data
|
||||
|
||||
Args:
|
||||
set_data (dict[str, Any] | list[Any] | None): _description_
|
||||
keys (list[str]): _description_
|
||||
|
||||
Returns:
|
||||
dict[str, Any] | list[Any] | None: _description_
|
||||
"""
|
||||
# skip everything if there is no keys list
|
||||
if not keys:
|
||||
return set_data
|
||||
if isinstance(set_data, dict):
|
||||
for key, value in set_data.copy().items():
|
||||
if key in keys:
|
||||
del set_data[key]
|
||||
if isinstance(value, (dict, list)):
|
||||
delete_keys_from_set(value, keys) # type: ignore Partly unknown
|
||||
elif isinstance(set_data, list):
|
||||
for value in set_data:
|
||||
if isinstance(value, (dict, list)):
|
||||
delete_keys_from_set(value, keys) # type: ignore Partly unknown
|
||||
else:
|
||||
set_data = [set_data]
|
||||
|
||||
return set_data
|
||||
|
||||
|
||||
def build_dict(
|
||||
any_dict: Any, ignore_entries: list[str] | None = None
|
||||
) -> dict[str, Any | list[Any] | dict[Any, Any]]:
|
||||
"""
|
||||
rewrite any AWS *TypeDef to new dict so we can add/change entrys
|
||||
|
||||
Args:
|
||||
any_dict (Any): _description_
|
||||
|
||||
Returns:
|
||||
dict[str, Any | list[Any]]: _description_
|
||||
"""
|
||||
if ignore_entries is None:
|
||||
return cast(dict[str, Any | list[Any] | dict[Any, Any]], any_dict)
|
||||
# ignore entries can be one key or key nested
|
||||
# return {
|
||||
# key: value for key, value in any_dict.items() if key not in ignore_entries
|
||||
# }
|
||||
return cast(
|
||||
dict[str, Any | list[Any] | dict[Any, Any]],
|
||||
delete_keys_from_set(any_dict, ignore_entries)
|
||||
)
|
||||
0
src/corelibs/logging_handling/__init__.py
Normal file
0
src/corelibs/logging_handling/__init__.py
Normal file
89
src/corelibs/logging_handling/error_handling.py
Normal file
89
src/corelibs/logging_handling/error_handling.py
Normal file
@@ -0,0 +1,89 @@
|
||||
"""
|
||||
Collect error and warning messages as JSON blocks into an array
|
||||
"""
|
||||
|
||||
from typing import Any
|
||||
|
||||
|
||||
class ErrorMessage:
|
||||
"""
|
||||
Error and Warning collector
|
||||
"""
|
||||
|
||||
# errors and warning
|
||||
__error_list: list[dict[str, Any]] = []
|
||||
__warning_list: list[dict[str, Any]] = []
|
||||
|
||||
def reset_warnings(self):
|
||||
"""
|
||||
reset warnings
|
||||
"""
|
||||
ErrorMessage.__warning_list = []
|
||||
|
||||
def add_warning(self, message: dict[str, Any], base_message: dict[str, Any] | None = None):
|
||||
"""
|
||||
add one warning
|
||||
|
||||
Args:
|
||||
message (dict): _description_
|
||||
base_message (dict, optional): _description_. Defaults to {}.
|
||||
"""
|
||||
if base_message is None or not isinstance(base_message, dict): # type: ignore
|
||||
base_message = {}
|
||||
base_message['level'] = "Warning"
|
||||
ErrorMessage.__warning_list.append(base_message | message)
|
||||
|
||||
def get_warnings(self) -> list[dict[str, Any]]:
|
||||
"""_summary_
|
||||
|
||||
Returns:
|
||||
list: _description_
|
||||
"""
|
||||
return ErrorMessage.__warning_list
|
||||
|
||||
def has_warnings(self) -> bool:
|
||||
"""
|
||||
check if there ware warnings
|
||||
|
||||
Returns:
|
||||
bool: _description_
|
||||
"""
|
||||
return bool(ErrorMessage.__warning_list)
|
||||
|
||||
def reset_errors(self):
|
||||
"""
|
||||
reset the error list
|
||||
"""
|
||||
ErrorMessage.__error_list = []
|
||||
|
||||
def add_error(self, message: dict[str, Any], base_messasge: dict[str, Any] | None = None):
|
||||
"""
|
||||
add one error
|
||||
|
||||
Args:
|
||||
error (dict): _description_
|
||||
base_error (dict, optional): _description_. Defaults to {}.
|
||||
"""
|
||||
if base_messasge is None or not isinstance(base_messasge, dict): # type: ignore
|
||||
base_messasge = {}
|
||||
base_messasge['level'] = "Error"
|
||||
ErrorMessage.__error_list.append(base_messasge | message)
|
||||
|
||||
def get_errors(self) -> list[dict[str, Any]]:
|
||||
"""_summary_
|
||||
|
||||
Returns:
|
||||
list: _description_
|
||||
"""
|
||||
return ErrorMessage.__error_list
|
||||
|
||||
def has_errors(self) -> bool:
|
||||
"""
|
||||
check if there ware warnings
|
||||
|
||||
Returns:
|
||||
bool: _description_
|
||||
"""
|
||||
return bool(ErrorMessage.__error_list)
|
||||
|
||||
# __END__
|
||||
120
src/corelibs/logging_handling/log.py
Normal file
120
src/corelibs/logging_handling/log.py
Normal file
@@ -0,0 +1,120 @@
|
||||
"""
|
||||
A log handler wrapper
|
||||
"""
|
||||
|
||||
import logging.handlers
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Mapping
|
||||
|
||||
|
||||
class Log:
|
||||
"""
|
||||
logger setup
|
||||
"""
|
||||
|
||||
EXCEPTION: int = 60
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
log_path: Path,
|
||||
log_name: str,
|
||||
log_level_console: str = 'WARNING',
|
||||
log_level_file: str = 'DEBUG',
|
||||
add_start_info: bool = True
|
||||
):
|
||||
logging.addLevelName(Log.EXCEPTION, 'EXCEPTION')
|
||||
if not log_name.endswith('.log'):
|
||||
log_path = log_path.with_suffix('.log')
|
||||
# overall logger settings
|
||||
self.logger = logging.getLogger(log_name)
|
||||
# set maximum logging level for all logging output
|
||||
self.logger.setLevel(logging.DEBUG)
|
||||
# console logger
|
||||
self.__console_handler(log_level_console)
|
||||
# file logger
|
||||
self.__file_handler(log_level_file, log_path)
|
||||
# if requests set a start log
|
||||
if add_start_info is True:
|
||||
self.break_line('START')
|
||||
|
||||
def __filter_exceptions(self, record: logging.LogRecord) -> bool:
|
||||
return record.levelname != "EXCEPTION"
|
||||
|
||||
def __console_handler(self, log_level_console: str = 'WARNING'):
|
||||
# console logger
|
||||
if not isinstance(getattr(logging, log_level_console.upper(), None), int):
|
||||
log_level_console = 'WARNING'
|
||||
console_handler = logging.StreamHandler()
|
||||
formatter_console = logging.Formatter(
|
||||
(
|
||||
'[%(asctime)s.%(msecs)03d] '
|
||||
'[%(filename)s:%(funcName)s:%(lineno)d] '
|
||||
'<%(levelname)s> '
|
||||
'%(message)s'
|
||||
),
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
console_handler.setLevel(log_level_console)
|
||||
# do not show exceptions logs on console
|
||||
console_handler.addFilter(self.__filter_exceptions)
|
||||
console_handler.setFormatter(formatter_console)
|
||||
self.logger.addHandler(console_handler)
|
||||
|
||||
def __file_handler(self, log_level_file: str, log_path: Path) -> None:
|
||||
# file logger
|
||||
if not isinstance(getattr(logging, log_level_file.upper(), None), int):
|
||||
log_level_file = 'DEBUG'
|
||||
file_handler = logging.handlers.TimedRotatingFileHandler(
|
||||
filename=log_path,
|
||||
encoding="utf-8",
|
||||
when="D",
|
||||
interval=1
|
||||
)
|
||||
formatter_file_handler = logging.Formatter(
|
||||
(
|
||||
'[%(asctime)s.%(msecs)03d] '
|
||||
'[%(pathname)s:%(funcName)s:%(lineno)d] '
|
||||
'[%(name)s:%(process)d] '
|
||||
'<%(levelname)s> '
|
||||
'%(message)s'
|
||||
),
|
||||
datefmt="%Y-%m-%dT%H:%M:%S",
|
||||
)
|
||||
file_handler.setLevel(log_level_file)
|
||||
file_handler.setFormatter(formatter_file_handler)
|
||||
self.logger.addHandler(file_handler)
|
||||
|
||||
def break_line(self, info: str = "BREAK"):
|
||||
"""
|
||||
add a break line as info level
|
||||
|
||||
Keyword Arguments:
|
||||
info {str} -- _description_ (default: {"BREAK"})
|
||||
"""
|
||||
self.logger.info("[%s] ================================>", info)
|
||||
|
||||
def exception(self, msg: object, *args: object, extra: Mapping[str, object] | None = None) -> None:
|
||||
"""
|
||||
log on exceotion level
|
||||
|
||||
Args:
|
||||
msg (object): _description_
|
||||
*args (object): arguments for msg
|
||||
extra: Mapping[str, object] | None: extra arguments for the formatting if needed
|
||||
"""
|
||||
self.logger.log(Log.EXCEPTION, msg, *args, exc_info=True, extra=extra)
|
||||
|
||||
def validate_log_level(self, log_level: str) -> bool:
|
||||
"""
|
||||
if the log level is invalid, will erturn false
|
||||
|
||||
Args:
|
||||
log_level (str): _description_
|
||||
|
||||
Returns:
|
||||
bool: _description_
|
||||
"""
|
||||
return isinstance(getattr(logging, log_level.upper(), None), int)
|
||||
|
||||
# __END__
|
||||
0
src/corelibs/py.typed
Normal file
0
src/corelibs/py.typed
Normal file
0
src/corelibs/requests_handling/__init__.py
Normal file
0
src/corelibs/requests_handling/__init__.py
Normal file
190
src/corelibs/requests_handling/caller.py
Normal file
190
src/corelibs/requests_handling/caller.py
Normal file
@@ -0,0 +1,190 @@
|
||||
"""
|
||||
requests lib interface
|
||||
V2 call type
|
||||
"""
|
||||
|
||||
from typing import Any
|
||||
import warnings
|
||||
import requests
|
||||
# to hide the verfiy warnings because of the bad SSL settings from Netskope, Akamai, etc
|
||||
warnings.filterwarnings('ignore', message='Unverified HTTPS request')
|
||||
|
||||
|
||||
class Caller:
|
||||
"""_summary_"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
header: dict[str, str],
|
||||
verify: bool = True,
|
||||
timeout: int = 20,
|
||||
proxy: dict[str, str] | None = None
|
||||
):
|
||||
self.headers = header
|
||||
self.timeout: int = timeout
|
||||
self.cafile = "/Library/Application Support/Netskope/STAgent/data/nscacert.pem"
|
||||
self.verify = verify
|
||||
self.proxy = proxy
|
||||
|
||||
def __timeout(self, timeout: int | None) -> int:
|
||||
if timeout is not None:
|
||||
return timeout
|
||||
return self.timeout
|
||||
|
||||
def __call(
|
||||
self,
|
||||
action: str,
|
||||
url: str,
|
||||
data: dict[str, Any] | None = None,
|
||||
params: dict[str, Any] | None = None,
|
||||
timeout: int | None = None
|
||||
) -> requests.Response | None:
|
||||
"""
|
||||
call wrapper, on error returns None
|
||||
|
||||
Args:
|
||||
action (str): _description_
|
||||
url (str): _description_
|
||||
data (dict | None): _description_. Defaults to None.
|
||||
params (dict | None): _description_. Defaults to None.
|
||||
|
||||
Returns:
|
||||
requests.Response | None: _description_
|
||||
"""
|
||||
|
||||
if data is None:
|
||||
data = {}
|
||||
try:
|
||||
response = None
|
||||
if action == "get":
|
||||
response = requests.get(
|
||||
url,
|
||||
params=params,
|
||||
headers=self.headers,
|
||||
timeout=self.__timeout(timeout),
|
||||
verify=self.verify,
|
||||
proxies=self.proxy
|
||||
)
|
||||
elif action == "post":
|
||||
response = requests.post(
|
||||
url,
|
||||
params=params,
|
||||
json=data,
|
||||
headers=self.headers,
|
||||
timeout=self.__timeout(timeout),
|
||||
verify=self.verify,
|
||||
proxies=self.proxy
|
||||
)
|
||||
elif action == "put":
|
||||
response = requests.put(
|
||||
url,
|
||||
params=params,
|
||||
json=data,
|
||||
headers=self.headers,
|
||||
timeout=self.__timeout(timeout),
|
||||
verify=self.verify,
|
||||
proxies=self.proxy
|
||||
)
|
||||
elif action == "patch":
|
||||
response = requests.patch(
|
||||
url,
|
||||
params=params,
|
||||
json=data,
|
||||
headers=self.headers,
|
||||
timeout=self.__timeout(timeout),
|
||||
verify=self.verify,
|
||||
proxies=self.proxy
|
||||
)
|
||||
elif action == "delete":
|
||||
response = requests.delete(
|
||||
url,
|
||||
params=params,
|
||||
headers=self.headers,
|
||||
timeout=self.__timeout(timeout),
|
||||
verify=self.verify,
|
||||
proxies=self.proxy
|
||||
)
|
||||
return response
|
||||
except requests.exceptions.InvalidSchema as e:
|
||||
print(f"Invalid URL during '{action}' for {url}:\n\t{e}")
|
||||
return None
|
||||
except requests.exceptions.ReadTimeout as e:
|
||||
print(f"Timeout ({self.timeout}s) during '{action}' for {url}:\n\t{e}")
|
||||
return None
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
print(f"Connection error during '{action}' for {url}:\n\t{e}")
|
||||
return None
|
||||
|
||||
def get(self, url: str, params: dict[str, Any] | None = None) -> requests.Response | None:
|
||||
"""
|
||||
get data
|
||||
|
||||
Args:
|
||||
url (str): _description_
|
||||
params (dict | None): _description_
|
||||
|
||||
Returns:
|
||||
requests.Response: _description_
|
||||
"""
|
||||
return self.__call('get', url, params=params)
|
||||
|
||||
def post(
|
||||
self, url: str, data: dict[str, Any] | None = None, params: dict[str, Any] | None = None
|
||||
) -> requests.Response | None:
|
||||
"""
|
||||
post data
|
||||
|
||||
Args:
|
||||
url (str): _description_
|
||||
data (dict | None): _description_
|
||||
params (dict | None): _description_
|
||||
|
||||
Returns:
|
||||
requests.Response | None: _description_
|
||||
"""
|
||||
return self.__call('post', url, data, params)
|
||||
|
||||
def put(
|
||||
self, url: str, data: dict[str, Any] | None = None, params: dict[str, Any] | None = None
|
||||
) -> requests.Response | None:
|
||||
"""_summary_
|
||||
|
||||
Args:
|
||||
url (str): _description_
|
||||
data (dict | None): _description_
|
||||
params (dict | None): _description_
|
||||
|
||||
Returns:
|
||||
requests.Response | None: _description_
|
||||
"""
|
||||
return self.__call('put', url, data, params)
|
||||
|
||||
def patch(
|
||||
self, url: str, data: dict[str, Any] | None = None, params: dict[str, Any] | None = None
|
||||
) -> requests.Response | None:
|
||||
"""_summary_
|
||||
|
||||
Args:
|
||||
url (str): _description_
|
||||
data (dict | None): _description_
|
||||
params (dict | None): _description_
|
||||
|
||||
Returns:
|
||||
requests.Response | None: _description_
|
||||
"""
|
||||
return self.__call('patch', url, data, params)
|
||||
|
||||
def delete(self, url: str, params: dict[str, Any] | None = None) -> requests.Response | None:
|
||||
"""
|
||||
delete
|
||||
|
||||
Args:
|
||||
url (str): _description_
|
||||
params (dict | None): _description_
|
||||
|
||||
Returns:
|
||||
requests.Response | None: _description_
|
||||
"""
|
||||
return self.__call('delete', url, params=params)
|
||||
|
||||
# __END__
|
||||
95
src/corelibs/script_handling/script_helpers.py
Normal file
95
src/corelibs/script_handling/script_helpers.py
Normal file
@@ -0,0 +1,95 @@
|
||||
"""
|
||||
Helper methods for scripts
|
||||
"""
|
||||
|
||||
import time
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
import psutil
|
||||
|
||||
|
||||
def wait_abort(sleep: int = 5) -> None:
|
||||
"""
|
||||
wait a certain time for an abort command
|
||||
|
||||
Keyword Arguments:
|
||||
sleep {int} -- _description_ (default: {5})
|
||||
"""
|
||||
try:
|
||||
print(f"Waiting {sleep} seconds (Press CTRL +C to abort) [", end="", flush=True)
|
||||
for _ in range(1, sleep):
|
||||
print(".", end="", flush=True)
|
||||
time.sleep(1)
|
||||
print("]", flush=True)
|
||||
except KeyboardInterrupt:
|
||||
print("\nInterrupted by user")
|
||||
sys.exit(0)
|
||||
print("\n\n")
|
||||
|
||||
|
||||
def lock_run(lock_file: Path) -> None:
|
||||
"""
|
||||
lock a script run
|
||||
needed is the lock file name
|
||||
|
||||
Arguments:
|
||||
lock_file {Path} -- _description_
|
||||
|
||||
Raises:
|
||||
IOError: _description_
|
||||
Exception: _description_
|
||||
IOError: _description_
|
||||
"""
|
||||
no_file = False
|
||||
run_pid = os.getpid()
|
||||
# or os.path.isfile()
|
||||
try:
|
||||
with open(lock_file, "r", encoding="UTF-8") as fp:
|
||||
exists = False
|
||||
pid = fp.read()
|
||||
fp.close()
|
||||
if pid:
|
||||
# check if this pid exists
|
||||
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
|
||||
try:
|
||||
if pid == proc.info['pid']:
|
||||
exists = True
|
||||
break
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
||||
# in case we cannot access
|
||||
continue
|
||||
if not exists:
|
||||
# no pid but lock file, unlink
|
||||
try:
|
||||
lock_file.unlink()
|
||||
no_file = True
|
||||
except IOError as e:
|
||||
raise IOError(f"Cannot remove lock_file: {lock_file}: {e}") from e
|
||||
else:
|
||||
raise IOError(f"Script is already running with PID {pid}")
|
||||
except IOError:
|
||||
no_file = True
|
||||
if no_file:
|
||||
try:
|
||||
with open(lock_file, "w", encoding="UTF-8") as fp:
|
||||
fp.write(str(run_pid))
|
||||
fp.close()
|
||||
except IOError as e:
|
||||
raise IOError(f"Cannot open run lock file '{lock_file}' for writing: {e}") from e
|
||||
|
||||
|
||||
def unlock_run(lock_file: Path) -> None:
|
||||
"""
|
||||
removes the lock file
|
||||
|
||||
Arguments:
|
||||
lock_file {Path} -- _description_
|
||||
|
||||
Raises:
|
||||
Exception: _description_
|
||||
"""
|
||||
try:
|
||||
lock_file.unlink()
|
||||
except IOError as e:
|
||||
raise IOError(f"Cannot remove lock_file: {lock_file}: {e}") from e
|
||||
0
src/corelibs/string_handling/__init__.py
Normal file
0
src/corelibs/string_handling/__init__.py
Normal file
37
src/corelibs/string_handling/byte_helpers.py
Normal file
37
src/corelibs/string_handling/byte_helpers.py
Normal file
@@ -0,0 +1,37 @@
|
||||
"""
|
||||
Format bytes
|
||||
"""
|
||||
|
||||
|
||||
def format_bytes(byte_value: float | int | str) -> str:
|
||||
"""
|
||||
Format a byte value to a human readable string
|
||||
|
||||
Arguments:
|
||||
byte_value {float | int | str} -- _description_
|
||||
|
||||
Returns:
|
||||
str -- _description_
|
||||
"""
|
||||
# if string exit
|
||||
if isinstance(byte_value, str):
|
||||
return byte_value
|
||||
# empty byte value is set to 0
|
||||
if not byte_value:
|
||||
byte_value = float(0)
|
||||
# if not float, convert to flaot
|
||||
if isinstance(byte_value, int):
|
||||
byte_value = float(byte_value)
|
||||
# loop through valid extensions
|
||||
for unit in ["B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB"]:
|
||||
# never go into the negativ and check if it is smaller than next set
|
||||
# if it is, print out return string
|
||||
if abs(byte_value) < 1024.0:
|
||||
return f"{byte_value:,.2f} {unit}"
|
||||
# divided for the next loop check
|
||||
byte_value /= 1024.0
|
||||
# if it is too big, return YB
|
||||
return f"{byte_value:,.2f} YB"
|
||||
|
||||
|
||||
# __NED__
|
||||
64
src/corelibs/string_handling/datetime_helpers.py
Normal file
64
src/corelibs/string_handling/datetime_helpers.py
Normal file
@@ -0,0 +1,64 @@
|
||||
"""
|
||||
Various string based date/time helpers
|
||||
"""
|
||||
|
||||
from math import floor
|
||||
import time
|
||||
|
||||
|
||||
def convert_timestamp(timestamp: float | int, show_micro: bool = True) -> str:
|
||||
"""
|
||||
format timestamp into human readable format
|
||||
|
||||
Arguments:
|
||||
timestamp {float} -- _description_
|
||||
|
||||
Keyword Arguments:
|
||||
show_micro {bool} -- _description_ (default: {True})
|
||||
|
||||
Returns:
|
||||
str -- _description_
|
||||
"""
|
||||
# cut of the ms, but first round them up to four
|
||||
__timestamp_ms_split = str(round(timestamp, 4)).split(".")
|
||||
timestamp = int(__timestamp_ms_split[0])
|
||||
try:
|
||||
ms = int(__timestamp_ms_split[1])
|
||||
except IndexError:
|
||||
ms = 0
|
||||
timegroups = (86400, 3600, 60, 1)
|
||||
output: list[int] = []
|
||||
for i in timegroups:
|
||||
output.append(int(floor(timestamp / i)))
|
||||
timestamp = timestamp % i
|
||||
# output has days|hours|min|sec ms
|
||||
time_string = ""
|
||||
if output[0]:
|
||||
time_string = f"{output[0]}d"
|
||||
if output[0] or output[1]:
|
||||
time_string += f"{output[1]}h "
|
||||
if output[0] or output[1] or output[2]:
|
||||
time_string += f"{output[2]}m "
|
||||
time_string += f"{output[3]}s"
|
||||
if show_micro:
|
||||
time_string += f" {ms}ms" if ms else " 0ms"
|
||||
return time_string
|
||||
|
||||
|
||||
def create_time(timestamp: float, timestamp_format: str = "%Y-%m-%d %H:%M:%S") -> str:
|
||||
"""
|
||||
just takes a timestamp and prints out humand readable format
|
||||
|
||||
Arguments:
|
||||
timestamp {float} -- _description_
|
||||
|
||||
Keyword Arguments:
|
||||
timestamp_format {_type_} -- _description_ (default: {"%Y-%m-%d %H:%M:%S"})
|
||||
|
||||
Returns:
|
||||
str -- _description_
|
||||
"""
|
||||
return time.strftime(timestamp_format, time.localtime(timestamp))
|
||||
|
||||
|
||||
# __END__
|
||||
226
src/corelibs/string_handling/double_byte_string_format.py
Normal file
226
src/corelibs/string_handling/double_byte_string_format.py
Normal file
@@ -0,0 +1,226 @@
|
||||
"""
|
||||
Format double byte strings to exact length
|
||||
"""
|
||||
|
||||
import unicodedata
|
||||
|
||||
|
||||
class DoubleByteFormatString:
|
||||
"""
|
||||
Format a string to exact length
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
string: str,
|
||||
cut_length: int,
|
||||
format_length: int | None = None,
|
||||
placeholder: str = '..',
|
||||
format_string: str = '{{:<{len}}}'
|
||||
):
|
||||
"""
|
||||
shorts a string to exact cut length and sets it to format length
|
||||
|
||||
after "cut_length" cut the "placeholder" will be added, so that the new cut_length is never
|
||||
larget than the cut_length given (".." is counted to cut_length)
|
||||
if format_length if set and outside format_length will be set
|
||||
the cut_length is adjusted to format_length if the format_length is shorter
|
||||
|
||||
Example
|
||||
|
||||
"Foo bar baz" 10 charcters -> 5 cut_length -> 10 format_length
|
||||
"Foo.. "
|
||||
|
||||
use class.get_string_short() for cut length shortend string
|
||||
use class.get_string_short_formated() to get the shorted string to format length padding
|
||||
|
||||
creates a class that shortens and sets the format length
|
||||
to use with a print format run the format needs to be pre set in
|
||||
the style of {{:<{len}}} style
|
||||
self.get_string_short_formated() for the "len" parameter
|
||||
|
||||
Args:
|
||||
string (str): string to work with
|
||||
cut_length (int): width to shorten to
|
||||
format_length (int | None): format length. Defaults to None
|
||||
placeholder (str, optional): placeholder to put after shortened string. Defaults to '..'.
|
||||
format_string (str, optional): format string. Defaults to '{{:<{len}}}'
|
||||
"""
|
||||
# output variables
|
||||
self.string_short: str = ''
|
||||
self.string_width_value: int = 0
|
||||
self.string_short_width: int = 0
|
||||
self.format_length_value: int = 0
|
||||
# internal varaibles
|
||||
self.placeholder: str = placeholder
|
||||
# original string
|
||||
self.string: str = ''
|
||||
# width to cut string to
|
||||
self.cut_length: int = 0
|
||||
# format length to set to
|
||||
self.format_length: int = 0
|
||||
# main string
|
||||
self.string = str(string)
|
||||
|
||||
self.format_string: str = format_string
|
||||
|
||||
# if width is > 0 set, else set width of string (fallback)
|
||||
if cut_length > 0:
|
||||
self.cut_length = cut_length
|
||||
elif cut_length <= 0:
|
||||
self.cut_length = self.__string_width_calc(self.string)
|
||||
# format length set, if not set or smaller than 0, set to width of string
|
||||
self.format_length = self.cut_length
|
||||
if format_length is not None and format_length > 0:
|
||||
self.format_length = format_length
|
||||
# check that width is not larger then length if yes, set width to length
|
||||
self.cut_length = min(self.cut_length, self.format_length)
|
||||
|
||||
# process the string shorten and format length calculation
|
||||
self.process()
|
||||
|
||||
def process(self):
|
||||
"""
|
||||
runs all the class methods to set string length, the string shortened
|
||||
and the format length
|
||||
"""
|
||||
# call the internal ones to set the data
|
||||
if self.string:
|
||||
self.__string_width()
|
||||
self.__shorten_string()
|
||||
if self.format_length:
|
||||
self.__format_length()
|
||||
|
||||
def get_string_short(self) -> str:
|
||||
"""
|
||||
get the shortend string
|
||||
|
||||
Returns:
|
||||
str -- _description_
|
||||
"""
|
||||
return self.string_short
|
||||
|
||||
def get_string_short_formated(self, format_string: str = '{{:<{len}}}') -> str:
|
||||
"""
|
||||
get the formatted string
|
||||
|
||||
Keyword Arguments:
|
||||
format_string {_type_} -- _description_ (default: {'{{:<{len}}}'})
|
||||
|
||||
Returns:
|
||||
str -- _description_
|
||||
"""
|
||||
if not format_string:
|
||||
format_string = self.format_string
|
||||
return format_string.format(
|
||||
len=self.get_format_length()
|
||||
).format(
|
||||
self.get_string_short()
|
||||
)
|
||||
|
||||
def get_format_length(self) -> int:
|
||||
"""
|
||||
get the format length for outside length set
|
||||
|
||||
Returns:
|
||||
int -- _description_
|
||||
"""
|
||||
return self.format_length_value
|
||||
|
||||
def get_cut_length(self) -> int:
|
||||
"""
|
||||
get the actual cut length
|
||||
|
||||
Returns:
|
||||
int -- _description_
|
||||
"""
|
||||
return self.cut_length
|
||||
|
||||
def get_requested_cut_length(self) -> int:
|
||||
"""
|
||||
get the requested cut length
|
||||
|
||||
Returns:
|
||||
int -- _description_
|
||||
"""
|
||||
return self.cut_length
|
||||
|
||||
def get_requested_format_length(self) -> int:
|
||||
"""
|
||||
get the requested format length
|
||||
|
||||
Returns:
|
||||
int -- _description_
|
||||
"""
|
||||
return self.format_length
|
||||
|
||||
def __string_width_calc(self, string: str) -> int:
|
||||
"""
|
||||
does the actual string width calculation
|
||||
|
||||
Args:
|
||||
string (str): string to calculate from
|
||||
|
||||
Returns:
|
||||
int: stringth width
|
||||
"""
|
||||
return sum(1 + (unicodedata.east_asian_width(c) in "WF") for c in string)
|
||||
|
||||
def __string_width(self):
|
||||
"""
|
||||
calculates the string width based on the characters
|
||||
this is an internal method and should not be called on itself
|
||||
"""
|
||||
# only run if string is set and is valid string
|
||||
if self.string:
|
||||
# calculate width. add +1 for each double byte character
|
||||
self.string_width_value = self.__string_width_calc(self.string)
|
||||
|
||||
def __format_length(self):
|
||||
"""
|
||||
set the format length based on the length for the format
|
||||
and the shortend string
|
||||
this is an internal method and should not be called on itself
|
||||
"""
|
||||
if not self.string_short:
|
||||
self.__shorten_string()
|
||||
# get correct format length based on string
|
||||
if (
|
||||
self.string_short and
|
||||
self.format_length > 0 and
|
||||
self.string_short_width > 0
|
||||
):
|
||||
# length: format length wanted
|
||||
# substract the width of the shortend string - the length of the shortend string
|
||||
self.format_length_value = self.format_length - (self.string_short_width - len(self.string_short))
|
||||
else:
|
||||
# if we have nothing to shorten the length, keep the old one
|
||||
self.format_length_value = self.format_length
|
||||
|
||||
def __shorten_string(self):
|
||||
"""
|
||||
shorten string down to set width
|
||||
this is an internal method and should not be called on itself
|
||||
"""
|
||||
# set string width if not set
|
||||
if not self.string_width_value:
|
||||
self.__string_width()
|
||||
# if the double byte string width is larger than the wanted width
|
||||
if self.string_width_value > self.cut_length:
|
||||
cur_len = 0
|
||||
self.string_short = ''
|
||||
for char in str(self.string):
|
||||
# set the current length if we add the character
|
||||
cur_len += 2 if unicodedata.east_asian_width(char) in "WF" else 1
|
||||
# if the new length is smaller than the output length to shorten too add the char
|
||||
if cur_len <= (self.cut_length - len(self.placeholder)):
|
||||
self.string_short += char
|
||||
self.string_short_width = cur_len
|
||||
# return string with new width and placeholder
|
||||
self.string_short = f"{self.string_short}{self.placeholder}"
|
||||
self.string_short_width += len(self.placeholder)
|
||||
else:
|
||||
# if string is same saze just copy
|
||||
self.string_short = self.string
|
||||
|
||||
# __END__
|
||||
38
src/corelibs/string_handling/hash_helpers.py
Normal file
38
src/corelibs/string_handling/hash_helpers.py
Normal file
@@ -0,0 +1,38 @@
|
||||
"""
|
||||
Various hash helpers for strings and things
|
||||
"""
|
||||
|
||||
import re
|
||||
import hashlib
|
||||
|
||||
|
||||
def crc32b_fix(crc: str) -> str:
|
||||
"""
|
||||
fix a CRC32B with wrong order (from old PHP)
|
||||
|
||||
Arguments:
|
||||
crc {str} -- _description_
|
||||
|
||||
Returns:
|
||||
str -- _description_
|
||||
"""
|
||||
# left pad with 0 to 8 chars
|
||||
crc = ("0" * (8 - len(crc))) + crc
|
||||
# flip two chars (byte hex)
|
||||
crc = re.sub(
|
||||
r"^([a-z0-9]{2})([a-z0-9]{2})([a-z0-9]{2})([a-z0-9]{2})$", r"\4\3\2\1", crc
|
||||
)
|
||||
return crc
|
||||
|
||||
|
||||
def sha1_short(string: str) -> str:
|
||||
"""
|
||||
Return a 9 character long SHA1 part
|
||||
|
||||
Arguments:
|
||||
string {str} -- _description_
|
||||
|
||||
Returns:
|
||||
str -- _description_
|
||||
"""
|
||||
return hashlib.sha1(string.encode('utf-8')).hexdigest()[:9]
|
||||
86
src/corelibs/string_handling/string_helpers.py
Normal file
86
src/corelibs/string_handling/string_helpers.py
Normal file
@@ -0,0 +1,86 @@
|
||||
"""
|
||||
String helpers
|
||||
"""
|
||||
|
||||
from textwrap import shorten
|
||||
|
||||
|
||||
def shorten_string(string: str, length: int, hard_shorten: bool = False, placeholder: str = " [~]") -> str:
|
||||
"""
|
||||
check if entry is too long and cut it, but only for console output
|
||||
Note that if there are no spaces in the string, it will automatically use the hard split mode
|
||||
|
||||
Args:
|
||||
string (str): _description_
|
||||
length (int): _description_
|
||||
hard_shorten (bool): if shorte should be done on fixed string lenght. Default: False
|
||||
placeholder (str): placeholder string. Default: " [~]"
|
||||
|
||||
Returns:
|
||||
str: _description_
|
||||
"""
|
||||
length = int(length)
|
||||
string = str(string)
|
||||
if len(string) > length:
|
||||
if hard_shorten is True or " " not in string:
|
||||
short_string = f"{string[:(length - len(placeholder))]}{placeholder}"
|
||||
else:
|
||||
short_string = shorten(string, width=length, placeholder=placeholder)
|
||||
else:
|
||||
short_string = string
|
||||
|
||||
return short_string
|
||||
|
||||
|
||||
def left_fill(string: str, width: int, char: str = " ") -> str:
|
||||
"""
|
||||
left fill for a certain length to fill a max size
|
||||
string is the original string to left padd, width is the maximum width
|
||||
that needs to be filled, char is the filler character
|
||||
|
||||
Arguments:
|
||||
string {str} -- _description_
|
||||
width {int} -- _description_
|
||||
|
||||
Keyword Arguments:
|
||||
char {str} -- _description_ (default: {" "})
|
||||
|
||||
Returns:
|
||||
str -- _description_
|
||||
"""
|
||||
# the width needs to be string
|
||||
if width < 0:
|
||||
width = len(string)
|
||||
# char can only be one length long
|
||||
if len(char) != 1:
|
||||
char = " "
|
||||
return (
|
||||
"{:"
|
||||
f"{char}>{width}"
|
||||
"}"
|
||||
).format(string)
|
||||
|
||||
|
||||
def format_number(number: float, precision: int = 0) -> str:
|
||||
"""
|
||||
format numbers, current trailing zeros does not work
|
||||
use {:,} or {:,.f} or {:,.<N>f} <N> = number instead of this
|
||||
|
||||
Arguments:
|
||||
number {float} -- _description_
|
||||
|
||||
Keyword Arguments:
|
||||
precision {int} -- _description_ (default: {0})
|
||||
|
||||
Returns:
|
||||
str -- _description_
|
||||
"""
|
||||
if precision < 0 and precision > 100:
|
||||
precision = 0
|
||||
return (
|
||||
"{:,."
|
||||
f"{str(precision)}"
|
||||
"f}"
|
||||
).format(number)
|
||||
|
||||
# __END__
|
||||
Reference in New Issue
Block a user