""" AUTHOR: Clemens Schwaighofer DATE CREATED: 2009/7/24 (2025/7/2) DESCRIPTION: progress percent class (perl -> python) HOW TO USE * load from progress import Progress * init prg = Progress() allowed parameters to pass are (in order) - verbose (0/1/...) : show output - precision (-2~10) : -2 (5%), -1 (10%), 0 (normal 0-100%), 1~10 (100.m~%) - microtime (1/0/-1) : show microtime in eta/run time - wide time (bool) : padd time so all time column doesn't change width of line - prefix line break (bool): add line break before string and not only after prg = Progress(verbose = 1, precision = 2) * settings methods set_wide_time(bool) set_microtime(int -1/0/1) set_prefix_lb(bool) set_verbose(0/1 int) set_precision(-2~10 int) set_linecount(int) set_filesize(int) set_start_time(time optional) set_eta_start_time(time optional) set_end_time(time optional) show_position(file pos optional) """ import time from typing import Literal from math import floor from corelibs.datetime_handling.timestamp_convert import convert_timestamp from corelibs.string_handling.byte_helpers import format_bytes class Progress(): """ file progress output information """ def __init__( self, verbose: int = 0, precision: int = 1, microtime: Literal[-1] | Literal[1] | Literal[0] = 0, wide_time: bool = False, prefix_lb: bool = False ): # set default var stuff # max lines in input self.linecount: int = 0 # max file size self.filesize: int = 0 # * comma after percent self.precision: int = 0 # * if flagged 1, then wthe wide 15 char left bound format is used self.wide_time: bool = False # * verbose status from outside self.verbose: bool = False # * microtime output for last run time (1 for enable 0 for auto -1 for disable) self.microtime: Literal[-1] | Literal[1] | Literal[0] = 0 # micro time flag for last group self.lg_microtime: bool = False # = flag if output was given self.change = 0 # = global start for the full script running time self.start: float | None = None # = for the eta time, can be set after a query or long read in, to not create a wrong ETA time self.start_run: float | None = None # loop start self.start_time: float | None = None # global end self.end: float | None = None # loop end self.end_time: float | None = None # run time in seconds, set when end time method is called self.run_time: float | None = None # = filesize current self.count_size: int | None = None # position current self.count: int = 0 # last count (position) self.current_count: int = 0 # the current file post self.file_pos: int | None = None # lines processed in the last run self.lines_processed: int = 0 # time in th seconds for the last group run (until percent change) self.last_group: float = 0 # float value, lines processed per second to the last group run self.lines_in_last_group: float = 0 # float values, lines processed per second to complete run self.lines_in_global: float = 0 # flaot value, bytes processes per second in the last group run self.bytes_in_last_group: float = 0 # float value, bytes processed per second to complete run self.bytes_in_global: float = 0 # bytes processed in last run (in bytes) self.size_in_last_group: int = 0 # current file position 8size) self.current_size: int = 0 # last percent position self.last_percent: int | float = 0 # if we have normal % or in steps of 10 self.precision_ten_step: int = 0 # the default size this is precision + 4 self.percent_print: int = 5 # this is 1 if it is 1 or 0 for precision or precision size self.percent_precision: int = 1 # prefix line with a line break self.prefix_lb: bool = False # estimated time to finish self.eta: float | None = None # run time since start self.full_time_needed: float | None = None # the actual output self.string: str = '' # initialize the class self.set_precision(precision) self.set_verbose(verbose) self.set_micro_time(microtime) self.set_wide_time(wide_time) self.set_prefix_lb(prefix_lb) self.set_start_time() def reset(self): """ resets the current progress to 0, but keeps the overall start variables set """ # reset what always gets reset self.count = 0 self.count_size = None self.current_count = 0 self.linecount = 0 self.lines_processed = 0 self.last_group = 0 self.lines_in_last_group = 0 self.lines_in_global = 0 self.bytes_in_last_group = 0 self.bytes_in_global = 0 self.size_in_last_group = 0 self.filesize = 0 self.current_size = 0 self.last_percent = 0 self.eta = 0 self.full_time_needed = 0 self.start_run = None self.start_time = None self.end_time = None def set_wide_time(self, wide_time: bool) -> bool: """ sets the show wide time flag Arguments: wide_time {bool} -- _description_ Returns: bool -- _description_ """ self.wide_time = wide_time return self.wide_time def set_micro_time(self, microtime: Literal[-1] | Literal[1] | Literal[0]) -> Literal[-1] | Literal[1] | Literal[0]: """sets the show microtime -1 OFF, 0 AUTO, 1 ON Returns: _type_ -- _description_ """ self.microtime = microtime return self.microtime def set_prefix_lb(self, prefix_lb: bool) -> bool: """ set prefix line break flag Arguments: prefix_lb {bool} -- _description_ Returns: bool -- _description_ """ self.prefix_lb = prefix_lb return self.prefix_lb def set_verbose(self, verbose: int) -> bool: """ set the internal verbose flag to 1 if any value higher than 1 is given, else sets it to 0 Arguments: verbose {int} -- _description_ Returns: bool -- _description_ """ if verbose > 0: self.verbose = True else: self.verbose = False return self.verbose def set_precision(self, precision: int) -> int: """ sets the output precision size. If -2 for five step, -1 for ten step else sets the precision normally, for 0, no precision is set, maximum precision is 10 Arguments: precision {int} -- _description_ Returns: int -- _description_ """ # if not a valid number, we set it to 0 if precision < -2 or precision > 10: precision = 0 if precision < 0: if precision < -1: self.precision_ten_step = 5 else: self.precision_ten_step = 10 self.precision = 0 # no comma self.percent_precision = 0 # no print precision self.percent_print = 3 # max 3 length else: # comma values visible self.precision = 10 if precision < 0 or precision > 10 else precision # for calcualtion of precision self.percent_precision = 10 if precision < 0 or precision > 10 else precision # for the format output base is 4, plsut he percent precision length self.percent_print = (3 if precision == 0 else 4) + self.percent_precision # return the set precision return self.precision def set_linecount(self, linecount: int) -> int: """ set the maximum lines in this file, if value is smaller than 0 or 0, then it is set to 1 Arguments: linecount {int} -- _description_ Returns: int -- _description_ """ if linecount > 0: self.linecount = linecount else: self.linecount = 1 return self.linecount def set_filesize(self, filesize: int) -> int: """ set the maximum filesize for this file, if value is smaller than 0 or 0, then it is set to 1 Arguments: filesize {int} -- _description_ Returns: int -- _description_ """ if filesize > 0: self.filesize = filesize else: self.filesize = 1 return self.filesize def set_start_time(self, time_value: float = time.time()) -> None: """ initial set of the start times, auto set Keyword Arguments: time_value {float} -- _description_ (default: {time.time()}) """ # avoid possible double set of the original start time if not self.start: self.start = time_value self.start_time = time_value self.start_run = time_value def set_eta_start_time(self, time_value: float = time.time()) -> None: """ sets the loop % run time, for correct ETA calculation calls set start time, as the main start time is only set once Keyword Arguments: time_value {float} -- _description_ (default: {time.time()}) """ self.set_start_time(time_value) def set_end_time(self, time_value: float = time.time()) -> None: """ set the end time Keyword Arguments: time_value {float} -- _description_ (default: {time.time()}) """ self.end = time_value self.end_time = time_value if self.start is None: self.start = 0 # the overall run time in micro seconds self.run_time = self.end - self.start def show_position(self, filepos: int = 0) -> str: """ processes the current position. either based on read the file size pos, or the line count Keyword Arguments: filepos {int} -- _description_ (default: {0}) Returns: str -- _description_ """ show_filesize = True # if we print from file size or line count # microtime flags eta_microtime = False ftn_microtime = False lg_microtime = False # percent precision calc # _p_spf = "{:." + str(self.precision) + "f}" # output format for percent _pr_p_spf = "{:>" + str(self.percent_print) + "." + str(self.percent_precision) + "f}" # set the linecount precision based on the final linecount, if not, leave it empty _pr_lc = "{}" if self.linecount: _pr_lc = "{:>" + str(len(str(f"{self.linecount:,}"))) + ",}" # time format, if flag is set, the wide format is used _pr_tf = "{}" if self.wide_time: _pr_tf = "{:>15}" # count up self.count += 1 # if we have file pos from parameter if filepos != 0: self.file_pos = filepos else: # we did not, so we set internal value self.file_pos = self.count # we also check if the filesize was set now if self.filesize == 0: self.filesize = self.linecount # set ignore filesize output (no data) show_filesize = False # set the count size based on the file pos, is only used if we have filesize self.count_size = self.file_pos # do normal or down to 10 (0, 10, ...) % if self.precision_ten_step: _percent = int((self.file_pos / float(self.filesize)) * 100) mod = _percent % self.precision_ten_step percent = _percent if mod == 0 else self.last_percent else: # calc percent percent = round(((self.file_pos / float(self.filesize)) * 100), self.precision) # output if percent != self.last_percent: self.end_time = time.time() # current time (for loop time) if self.start is None: self.start = 0 if self.start_time is None: self.start_time = 0 # for from the beginning full_time_needed = self.end_time - self.start # how long from the start self.last_group = self.end_time - self.start_time # how long for last loop self.lines_processed = self.count - self.current_count # how many lines processed # lines in last group self.lines_in_last_group = (self.lines_processed / self.last_group) if self.last_group else 0 # lines in global self.lines_in_global = (self.count / full_time_needed) if full_time_needed else 0 # if we have linecount or not if self.linecount == 0: full_time_per_line = (full_time_needed if full_time_needed else 1) / self.count_size # how long for all # estimate for the rest eta = full_time_per_line * (self.filesize - self.count_size) else: # how long for all full_time_per_line = (full_time_needed if full_time_needed else 1) / self.count # estimate for the rest eta = full_time_per_line * (self.linecount - self.count) # just in case ... if eta < 0: eta = 0 # check if to show microtime # ON if self.microtime == 1: eta_microtime = ftn_microtime = lg_microtime = True # AUTO if self.microtime == 0: if eta > 0 and eta < 1: eta_microtime = True if full_time_needed > 0 and full_time_needed < 1: ftn_microtime = True # pre check last group: if pre comma part is same add microtime anyway if self.last_group > 0 and self.last_group < 1: lg_microtime = True if self.last_group == floor(self.last_group): lg_microtime = True self.last_group = floor(self.last_group) # if with filesize or without if show_filesize: # last group size self.size_in_last_group = self.count_size - self.current_size # calc kb/s if there is any filesize data # last group self.bytes_in_last_group = (self.size_in_last_group / self.last_group) if self.last_group else 0 # global self.bytes_in_global = (self.count_size / full_time_needed) if full_time_needed else 0 # only used if we run with file size for the next check self.current_size = self.count_size if self.verbose >= 1: self.string = ( f"Processed {_pr_p_spf}% " "[{} / {}] | " f"{_pr_lc} / {_pr_lc} Lines | ETA: {_pr_tf} / TR: {_pr_tf} / " "LR: {:,} " "lines ({:,}) in {}, {:,.2f} ({:,.2f}) lines/s, {} ({}) b/s" ).format( float(percent), format_bytes(self.count_size), format_bytes(self.filesize), self.count, self.linecount, convert_timestamp(eta, eta_microtime), convert_timestamp(full_time_needed, ftn_microtime), self.lines_processed, self.size_in_last_group, convert_timestamp(self.last_group, lg_microtime), self.lines_in_global, self.lines_in_last_group, format_bytes(self.bytes_in_global), format_bytes(self.bytes_in_last_group) ) else: if self.verbose >= 1: self.string = ( f"Processed {_pr_p_spf}% | {_pr_lc} / {_pr_lc} Lines " f"| ETA: {_pr_tf} / TR: {_pr_tf} / " "LR: {:,} lines in {}, {:,.2f} ({:,.2f}) lines/s" ).format( float(percent), self.count, self.linecount, convert_timestamp(eta, eta_microtime), convert_timestamp(full_time_needed, ftn_microtime), self.lines_processed, convert_timestamp(self.last_group, lg_microtime), self.lines_in_global, self.lines_in_last_group ) # prefix return string with line break if flagged self.string = ("\n" if self.prefix_lb else '') + self.string # print the string if verbose is turned on if self.verbose >= 1: print(self.string) # write back vars self.last_percent = percent self.eta = eta self.full_time_needed = full_time_needed self.lg_microtime = lg_microtime # for the next run, check data self.start_time = time.time() self.current_count = self.count # trigger if this is a change self.change = 1 else: # trigger if this is a change self.change = 0 # return string return self.string # } END OF ShowPosition # __END__