478 lines
18 KiB
Python
478 lines
18 KiB
Python
"""
|
|
AUTHOR: Clemens Schwaighofer
|
|
DATE CREATED: 2009/7/24 (2025/7/2)
|
|
DESCRIPTION: progress percent class (perl -> python)
|
|
|
|
HOW TO USE
|
|
* load
|
|
from progress import Progress
|
|
* init
|
|
prg = Progress()
|
|
allowed parameters to pass are (in order)
|
|
- verbose (0/1/...) : show output
|
|
- precision (-2~10) : -2 (5%), -1 (10%), 0 (normal 0-100%), 1~10 (100.m~%)
|
|
- microtime (1/0/-1) : show microtime in eta/run time
|
|
- wide time (bool) : padd time so all time column doesn't change width of line
|
|
- prefix line break (bool): add line break before string and not only after
|
|
prg = Progress(verbose = 1, precision = 2)
|
|
* settings methods
|
|
set_wide_time(bool)
|
|
set_microtime(int -1/0/1)
|
|
set_prefix_lb(bool)
|
|
set_verbose(0/1 int)
|
|
set_precision(-2~10 int)
|
|
set_linecount(int)
|
|
set_filesize(int)
|
|
set_start_time(time optional)
|
|
set_eta_start_time(time optional)
|
|
set_end_time(time optional)
|
|
show_position(file pos optional)
|
|
"""
|
|
|
|
import time
|
|
from typing import Literal
|
|
from math import floor
|
|
from corelibs.datetime_handling.timestamp_convert import convert_timestamp
|
|
from corelibs.string_handling.byte_helpers import format_bytes
|
|
|
|
|
|
class Progress():
|
|
"""
|
|
file progress output information
|
|
"""
|
|
def __init__(
|
|
self,
|
|
verbose: int = 0,
|
|
precision: int = 1,
|
|
microtime: Literal[-1] | Literal[1] | Literal[0] = 0,
|
|
wide_time: bool = False,
|
|
prefix_lb: bool = False
|
|
):
|
|
# set default var stuff
|
|
# max lines in input
|
|
self.linecount: int = 0
|
|
# max file size
|
|
self.filesize: int = 0
|
|
# * comma after percent
|
|
self.precision: int = 0
|
|
# * if flagged 1, then wthe wide 15 char left bound format is used
|
|
self.wide_time: bool = False
|
|
# * verbose status from outside
|
|
self.verbose: bool = False
|
|
# * microtime output for last run time (1 for enable 0 for auto -1 for disable)
|
|
self.microtime: Literal[-1] | Literal[1] | Literal[0] = 0
|
|
# micro time flag for last group
|
|
self.lg_microtime: bool = False
|
|
# = flag if output was given
|
|
self.change = 0
|
|
# = global start for the full script running time
|
|
self.start: float | None = None
|
|
# = for the eta time, can be set after a query or long read in, to not create a wrong ETA time
|
|
self.start_run: float | None = None
|
|
# loop start
|
|
self.start_time: float | None = None
|
|
# global end
|
|
self.end: float | None = None
|
|
# loop end
|
|
self.end_time: float | None = None
|
|
# run time in seconds, set when end time method is called
|
|
self.run_time: float | None = None
|
|
# = filesize current
|
|
self.count_size: int | None = None
|
|
# position current
|
|
self.count: int = 0
|
|
# last count (position)
|
|
self.current_count: int = 0
|
|
# the current file post
|
|
self.file_pos: int | None = None
|
|
# lines processed in the last run
|
|
self.lines_processed: int = 0
|
|
# time in th seconds for the last group run (until percent change)
|
|
self.last_group: float = 0
|
|
# float value, lines processed per second to the last group run
|
|
self.lines_in_last_group: float = 0
|
|
# float values, lines processed per second to complete run
|
|
self.lines_in_global: float = 0
|
|
# flaot value, bytes processes per second in the last group run
|
|
self.bytes_in_last_group: float = 0
|
|
# float value, bytes processed per second to complete run
|
|
self.bytes_in_global: float = 0
|
|
# bytes processed in last run (in bytes)
|
|
self.size_in_last_group: int = 0
|
|
# current file position 8size)
|
|
self.current_size: int = 0
|
|
# last percent position
|
|
self.last_percent: int | float = 0
|
|
# if we have normal % or in steps of 10
|
|
self.precision_ten_step: int = 0
|
|
# the default size this is precision + 4
|
|
self.percent_print: int = 5
|
|
# this is 1 if it is 1 or 0 for precision or precision size
|
|
self.percent_precision: int = 1
|
|
# prefix line with a line break
|
|
self.prefix_lb: bool = False
|
|
# estimated time to finish
|
|
self.eta: float | None = None
|
|
# run time since start
|
|
self.full_time_needed: float | None = None
|
|
# the actual output
|
|
self.string: str = ''
|
|
|
|
# initialize the class
|
|
self.set_precision(precision)
|
|
self.set_verbose(verbose)
|
|
self.set_micro_time(microtime)
|
|
self.set_wide_time(wide_time)
|
|
self.set_prefix_lb(prefix_lb)
|
|
self.set_start_time()
|
|
|
|
def reset(self):
|
|
"""
|
|
resets the current progress to 0, but keeps the overall start variables set
|
|
"""
|
|
# reset what always gets reset
|
|
self.count = 0
|
|
self.count_size = None
|
|
self.current_count = 0
|
|
self.linecount = 0
|
|
self.lines_processed = 0
|
|
self.last_group = 0
|
|
self.lines_in_last_group = 0
|
|
self.lines_in_global = 0
|
|
self.bytes_in_last_group = 0
|
|
self.bytes_in_global = 0
|
|
self.size_in_last_group = 0
|
|
self.filesize = 0
|
|
self.current_size = 0
|
|
self.last_percent = 0
|
|
self.eta = 0
|
|
self.full_time_needed = 0
|
|
self.start_run = None
|
|
self.start_time = None
|
|
self.end_time = None
|
|
|
|
def set_wide_time(self, wide_time: bool) -> bool:
|
|
"""
|
|
sets the show wide time flag
|
|
|
|
Arguments:
|
|
wide_time {bool} -- _description_
|
|
|
|
Returns:
|
|
bool -- _description_
|
|
"""
|
|
self.wide_time = wide_time
|
|
return self.wide_time
|
|
|
|
def set_micro_time(self, microtime: Literal[-1] | Literal[1] | Literal[0]) -> Literal[-1] | Literal[1] | Literal[0]:
|
|
"""sets the show microtime -1 OFF, 0 AUTO, 1 ON
|
|
|
|
Returns:
|
|
_type_ -- _description_
|
|
"""
|
|
self.microtime = microtime
|
|
return self.microtime
|
|
|
|
def set_prefix_lb(self, prefix_lb: bool) -> bool:
|
|
"""
|
|
set prefix line break flag
|
|
|
|
Arguments:
|
|
prefix_lb {bool} -- _description_
|
|
|
|
Returns:
|
|
bool -- _description_
|
|
"""
|
|
self.prefix_lb = prefix_lb
|
|
return self.prefix_lb
|
|
|
|
def set_verbose(self, verbose: int) -> bool:
|
|
"""
|
|
set the internal verbose flag to 1 if any value higher than 1 is given, else sets it to 0
|
|
|
|
Arguments:
|
|
verbose {int} -- _description_
|
|
|
|
Returns:
|
|
bool -- _description_
|
|
"""
|
|
if verbose > 0:
|
|
self.verbose = True
|
|
else:
|
|
self.verbose = False
|
|
return self.verbose
|
|
|
|
def set_precision(self, precision: int) -> int:
|
|
"""
|
|
sets the output precision size. If -2 for five step, -1 for ten step
|
|
else sets the precision normally, for 0, no precision is set, maximum precision is 10
|
|
|
|
Arguments:
|
|
precision {int} -- _description_
|
|
|
|
Returns:
|
|
int -- _description_
|
|
"""
|
|
# if not a valid number, we set it to 0
|
|
if precision < -2 or precision > 10:
|
|
precision = 0
|
|
if precision < 0:
|
|
if precision < -1:
|
|
self.precision_ten_step = 5
|
|
else:
|
|
self.precision_ten_step = 10
|
|
self.precision = 0 # no comma
|
|
self.percent_precision = 0 # no print precision
|
|
self.percent_print = 3 # max 3 length
|
|
else:
|
|
# comma values visible
|
|
self.precision = 10 if precision < 0 or precision > 10 else precision
|
|
# for calcualtion of precision
|
|
self.percent_precision = 10 if precision < 0 or precision > 10 else precision
|
|
# for the format output base is 4, plsut he percent precision length
|
|
self.percent_print = (3 if precision == 0 else 4) + self.percent_precision
|
|
# return the set precision
|
|
return self.precision
|
|
|
|
def set_linecount(self, linecount: int) -> int:
|
|
"""
|
|
set the maximum lines in this file, if value is smaller than 0 or 0, then it is set to 1
|
|
|
|
Arguments:
|
|
linecount {int} -- _description_
|
|
|
|
Returns:
|
|
int -- _description_
|
|
"""
|
|
if linecount > 0:
|
|
self.linecount = linecount
|
|
else:
|
|
self.linecount = 1
|
|
return self.linecount
|
|
|
|
def set_filesize(self, filesize: int) -> int:
|
|
"""
|
|
set the maximum filesize for this file, if value is smaller than 0 or 0, then it is set to 1
|
|
|
|
Arguments:
|
|
filesize {int} -- _description_
|
|
|
|
Returns:
|
|
int -- _description_
|
|
"""
|
|
if filesize > 0:
|
|
self.filesize = filesize
|
|
else:
|
|
self.filesize = 1
|
|
return self.filesize
|
|
|
|
def set_start_time(self, time_value: float = time.time()) -> None:
|
|
"""
|
|
initial set of the start times, auto set
|
|
|
|
Keyword Arguments:
|
|
time_value {float} -- _description_ (default: {time.time()})
|
|
"""
|
|
# avoid possible double set of the original start time
|
|
if not self.start:
|
|
self.start = time_value
|
|
self.start_time = time_value
|
|
self.start_run = time_value
|
|
|
|
def set_eta_start_time(self, time_value: float = time.time()) -> None:
|
|
"""
|
|
sets the loop % run time, for correct ETA calculation
|
|
calls set start time, as the main start time is only set once
|
|
|
|
Keyword Arguments:
|
|
time_value {float} -- _description_ (default: {time.time()})
|
|
"""
|
|
self.set_start_time(time_value)
|
|
|
|
def set_end_time(self, time_value: float = time.time()) -> None:
|
|
"""
|
|
set the end time
|
|
|
|
Keyword Arguments:
|
|
time_value {float} -- _description_ (default: {time.time()})
|
|
"""
|
|
self.end = time_value
|
|
self.end_time = time_value
|
|
if self.start is None:
|
|
self.start = 0
|
|
# the overall run time in micro seconds
|
|
self.run_time = self.end - self.start
|
|
|
|
def show_position(self, filepos: int = 0) -> str:
|
|
"""
|
|
processes the current position. either based on read the file size pos, or the line count
|
|
|
|
Keyword Arguments:
|
|
filepos {int} -- _description_ (default: {0})
|
|
|
|
Returns:
|
|
str -- _description_
|
|
"""
|
|
show_filesize = True # if we print from file size or line count
|
|
# microtime flags
|
|
eta_microtime = False
|
|
ftn_microtime = False
|
|
lg_microtime = False
|
|
# percent precision calc
|
|
# _p_spf = "{:." + str(self.precision) + "f}"
|
|
# output format for percent
|
|
_pr_p_spf = "{:>" + str(self.percent_print) + "." + str(self.percent_precision) + "f}"
|
|
# set the linecount precision based on the final linecount, if not, leave it empty
|
|
_pr_lc = "{}"
|
|
if self.linecount:
|
|
_pr_lc = "{:>" + str(len(str(f"{self.linecount:,}"))) + ",}"
|
|
# time format, if flag is set, the wide format is used
|
|
_pr_tf = "{}"
|
|
if self.wide_time:
|
|
_pr_tf = "{:>15}"
|
|
|
|
# count up
|
|
self.count += 1
|
|
# if we have file pos from parameter
|
|
if filepos != 0:
|
|
self.file_pos = filepos
|
|
else:
|
|
# we did not, so we set internal value
|
|
self.file_pos = self.count
|
|
# we also check if the filesize was set now
|
|
if self.filesize == 0:
|
|
self.filesize = self.linecount
|
|
# set ignore filesize output (no data)
|
|
show_filesize = False
|
|
# set the count size based on the file pos, is only used if we have filesize
|
|
self.count_size = self.file_pos
|
|
# do normal or down to 10 (0, 10, ...) %
|
|
if self.precision_ten_step:
|
|
_percent = int((self.file_pos / float(self.filesize)) * 100)
|
|
mod = _percent % self.precision_ten_step
|
|
percent = _percent if mod == 0 else self.last_percent
|
|
else:
|
|
# calc percent
|
|
percent = round(((self.file_pos / float(self.filesize)) * 100), self.precision)
|
|
|
|
# output
|
|
if percent != self.last_percent:
|
|
self.end_time = time.time() # current time (for loop time)
|
|
if self.start is None:
|
|
self.start = 0
|
|
if self.start_time is None:
|
|
self.start_time = 0
|
|
# for from the beginning
|
|
full_time_needed = self.end_time - self.start # how long from the start
|
|
self.last_group = self.end_time - self.start_time # how long for last loop
|
|
self.lines_processed = self.count - self.current_count # how many lines processed
|
|
# lines in last group
|
|
self.lines_in_last_group = (self.lines_processed / self.last_group) if self.last_group else 0
|
|
# lines in global
|
|
self.lines_in_global = (self.count / full_time_needed) if full_time_needed else 0
|
|
# if we have linecount or not
|
|
if self.linecount == 0:
|
|
full_time_per_line = (full_time_needed if full_time_needed else 1) / self.count_size # how long for all
|
|
# estimate for the rest
|
|
eta = full_time_per_line * (self.filesize - self.count_size)
|
|
else:
|
|
# how long for all
|
|
full_time_per_line = (full_time_needed if full_time_needed else 1) / self.count
|
|
# estimate for the rest
|
|
eta = full_time_per_line * (self.linecount - self.count)
|
|
# just in case ...
|
|
if eta < 0:
|
|
eta = 0
|
|
# check if to show microtime
|
|
# ON
|
|
if self.microtime == 1:
|
|
eta_microtime = ftn_microtime = lg_microtime = True
|
|
# AUTO
|
|
if self.microtime == 0:
|
|
if eta > 0 and eta < 1:
|
|
eta_microtime = True
|
|
if full_time_needed > 0 and full_time_needed < 1:
|
|
ftn_microtime = True
|
|
# pre check last group: if pre comma part is same add microtime anyway
|
|
if self.last_group > 0 and self.last_group < 1:
|
|
lg_microtime = True
|
|
if self.last_group == floor(self.last_group):
|
|
lg_microtime = True
|
|
self.last_group = floor(self.last_group)
|
|
# if with filesize or without
|
|
if show_filesize:
|
|
# last group size
|
|
self.size_in_last_group = self.count_size - self.current_size
|
|
# calc kb/s if there is any filesize data
|
|
# last group
|
|
self.bytes_in_last_group = (self.size_in_last_group / self.last_group) if self.last_group else 0
|
|
# global
|
|
self.bytes_in_global = (self.count_size / full_time_needed) if full_time_needed else 0
|
|
# only used if we run with file size for the next check
|
|
self.current_size = self.count_size
|
|
|
|
if self.verbose >= 1:
|
|
self.string = (
|
|
f"Processed {_pr_p_spf}% "
|
|
"[{} / {}] | "
|
|
f"{_pr_lc} / {_pr_lc} Lines | ETA: {_pr_tf} / TR: {_pr_tf} / "
|
|
"LR: {:,} "
|
|
"lines ({:,}) in {}, {:,.2f} ({:,.2f}) lines/s, {} ({}) b/s"
|
|
).format(
|
|
float(percent),
|
|
format_bytes(self.count_size),
|
|
format_bytes(self.filesize),
|
|
self.count,
|
|
self.linecount,
|
|
convert_timestamp(eta, eta_microtime),
|
|
convert_timestamp(full_time_needed, ftn_microtime),
|
|
self.lines_processed,
|
|
self.size_in_last_group,
|
|
convert_timestamp(self.last_group, lg_microtime),
|
|
self.lines_in_global,
|
|
self.lines_in_last_group,
|
|
format_bytes(self.bytes_in_global),
|
|
format_bytes(self.bytes_in_last_group)
|
|
)
|
|
else:
|
|
if self.verbose >= 1:
|
|
self.string = (
|
|
f"Processed {_pr_p_spf}% | {_pr_lc} / {_pr_lc} Lines "
|
|
f"| ETA: {_pr_tf} / TR: {_pr_tf} / "
|
|
"LR: {:,} lines in {}, {:,.2f} ({:,.2f}) lines/s"
|
|
).format(
|
|
float(percent),
|
|
self.count,
|
|
self.linecount,
|
|
convert_timestamp(eta, eta_microtime),
|
|
convert_timestamp(full_time_needed, ftn_microtime),
|
|
self.lines_processed,
|
|
convert_timestamp(self.last_group, lg_microtime),
|
|
self.lines_in_global,
|
|
self.lines_in_last_group
|
|
)
|
|
# prefix return string with line break if flagged
|
|
self.string = ("\n" if self.prefix_lb else '') + self.string
|
|
# print the string if verbose is turned on
|
|
if self.verbose >= 1:
|
|
print(self.string)
|
|
|
|
# write back vars
|
|
self.last_percent = percent
|
|
self.eta = eta
|
|
self.full_time_needed = full_time_needed
|
|
self.lg_microtime = lg_microtime
|
|
# for the next run, check data
|
|
self.start_time = time.time()
|
|
self.current_count = self.count
|
|
# trigger if this is a change
|
|
self.change = 1
|
|
else:
|
|
# trigger if this is a change
|
|
self.change = 0
|
|
# return string
|
|
return self.string
|
|
# } END OF ShowPosition
|
|
|
|
# __END__
|