Compare commits

...

23 Commits

Author SHA1 Message Date
Clemens Schwaighofer
67f1a6688d Update Colors to have static colors that we can reset to 2025-07-09 15:22:31 +09:00
Clemens Schwaighofer
efb7968e93 time zone general names: upper case for constant 2025-07-09 15:09:12 +09:00
Clemens Schwaighofer
fe7c7db004 Change all __class__ to self for class global var 2025-07-09 15:06:27 +09:00
Clemens Schwaighofer
79d1ccae9a Readme file update 2025-07-09 14:48:54 +09:00
Clemens Schwaighofer
6e69af4aa8 Update pyproject.toml with new version 2025-07-09 14:45:53 +09:00
Clemens Schwaighofer
d500b7d473 Log update with listener Queues and color highlight for console
enables Queues if multiprocessing.Queue() is set in the "log_queue" setting

Now a logger "Log.init_worker_logging" can be attached to the ProcessPoolExecutor,
the init args must be the log_queue set

example:

```py
with concurrent.futures.ProcessPoolExecutor(
    max_workers=max_forks,
    initializer=Log.init_worker_logging,
    initargs=(log_queue,)
) as executor:
````

Move all settings into a settings argument, the structure is defined in LogSettings.
Default settings are in Log.DEFAULT_LOG_SETTINGS

Only log path and log name are parameters

Color output for console is on default enabled, disable via "console_color_output_enabled"
The complete console output can be stopped with "console_enabled"
2025-07-09 14:41:53 +09:00
Clemens Schwaighofer
ef599a1aad Version update in uv.lock 2025-07-09 09:45:35 +09:00
Clemens Schwaighofer
2d197134f1 log splitter output make length configurable 2025-07-09 09:44:55 +09:00
Clemens Schwaighofer
717080a009 Add Text ANSI colors class 2025-07-09 08:54:29 +09:00
Clemens Schwaighofer
19197c71ff README file name fix 2025-07-08 17:50:33 +09:00
Clemens Schwaighofer
051b93f2d8 TLS update in the Readme file 2025-07-08 17:46:00 +09:00
Clemens Schwaighofer
e04b3598b8 Ingore .coverage file from pytest 2025-07-08 15:59:53 +09:00
Clemens Schwaighofer
b88e0fe564 Add tests for string helpers too, update timestamp strings tests
Fix string helpers calls for some edge cases
2025-07-08 15:58:37 +09:00
Clemens Schwaighofer
060e3b4afe Move test runs into the test-run folder, add TimestampStrings, add basic tests via pytest
pytest added for dev.
Move all the test run python scripts into the "test-run" folder, so we can use the tests/ folder for just tests.

Setup one test for the TimestampStrings class with pytests in tests/unit/string_handling/test_timestamp_strings.py
2025-07-08 14:54:26 +09:00
Clemens Schwaighofer
cd07267475 v0.6.0: Fix src folder name 2025-07-08 10:04:01 +09:00
Clemens Schwaighofer
2fa031f6ee Comment out log handlers until we rebuild the logging class 2025-07-08 10:01:09 +09:00
Clemens Schwaighofer
f38cce1c1d Rename src CoreLibs to corelibs 2025-07-08 09:58:33 +09:00
Clemens Schwaighofer
52dd1e7b73 Fix base folder name, must be lower case 2025-07-08 09:56:43 +09:00
Clemens Schwaighofer
661a182655 Fix __init__.py 2025-07-08 09:47:55 +09:00
Clemens Schwaighofer
d803de312d Change log file formatter order 2025-07-08 09:46:00 +09:00
Clemens Schwaighofer
57a36d64f1 UV install information link 2025-07-04 10:55:10 +09:00
Clemens Schwaighofer
0cc4883fa1 Move the mask from string to dict helpers, add end comment 2025-07-03 14:01:40 +09:00
Clemens Schwaighofer
1eb464dd2c Add string handling helper maks to mask strings 2025-07-03 13:57:39 +09:00
54 changed files with 1508 additions and 146 deletions

1
.gitignore vendored
View File

@@ -3,3 +3,4 @@
**/*.egg-info
.mypy_cache/
**/.env
.coverage

View File

@@ -22,6 +22,10 @@ This is a pip package that can be installed into any project and covers the foll
- script_handling: pid lock file handling, abort timer
- string_handling: byte format, datetime format, hashing, string formats for numbrers, double byte string format, etc
## UV setup
uv must be [installed](https://docs.astral.sh/uv/getting-started/installation/)
## How to publish
Have the following setup in `project.toml`
@@ -35,8 +39,8 @@ explicit = true
```
```sh
uv build --native-tls
uv publish --index egra-gitea --token <gitea token> --native-tls
uv build
uv publish --index egra-gitea --token <gitea token>
```
## Test package
@@ -44,21 +48,55 @@ uv publish --index egra-gitea --token <gitea token> --native-tls
We must set the full index URL here because we run with "--no-project"
```sh
uv run --with corelibs --index egra-gitea=https://git.egplusww.jp/api/packages/PyPI/pypi/simple/ --no-project --native-tls -- python -c "import corelibs"
uv run --with corelibs --index egra-gitea=https://git.egplusww.jp/api/packages/PyPI/pypi/simple/ --no-project -- python -c "import corelibs"
```
### Python tests
All python tests are the tests/ folder. They are structured by the source folder layout
run them with
```sh
uv run pytest
```
Get a coverate report
```sh
uv run pytest --cov=corelibs
```
### Other tests
In the test folder other tests are located.
In the test-run folder usage and run tests are located
At the moment only a small test for the "progress" and the "double byte string format" module is set
#### Progress
```sh
uv run --native-tls tests/progress/progress_test.py
uv run test-run/progress/progress_test.py
```
#### Double byte string format
```sh
uv run test-run/double_byte_string_format/double_byte_string_format.py
```
#### Strings helpers
```sh
uv run test-run/timestamp_strings/timestamp_strings.py
```
```sh
uv run --native-tls tests/double_byte_string_format/double_byte_string_format.py
uv run test-run/string_handling/string_helpers.py
```
#### Log
```sh
uv run test-run/logging_handling/log.py
```
## How to install in another project
@@ -66,19 +104,23 @@ uv run --native-tls tests/double_byte_string_format/double_byte_string_format.py
This will also add the index entry
```sh
uv add corelibs --index egra-gitea=https://git.egplusww.jp/api/packages/PyPI/pypi/simple/ --native-tls
uv add corelibs --index egra-gitea=https://git.egplusww.jp/api/packages/PyPI/pypi/simple/
```
## Python venv setup
In the folder where the script will be located
```sh
uv venv --python 3.13
```
Install all neded dependencies
After clone, run the command below to install all dependenciss
```sh
uv sync
```
## NOTE on TLS problems
> [!warning] TLS problems with Netskope
If the Netskope service is running all uv runs will fail unless either --native-tls is set or the enviroment variable SSL_CERT_FILE is set, see blow
```sh
export SSL_CERT_FILE='/Library/Application Support/Netskope/STAgent/data/nscacert_combined.pem'
```

View File

@@ -1,9 +1,9 @@
# MARK: Project info
[project]
name = "corelibs"
version = "0.4.0"
version = "0.9.0"
description = "Collection of utils for Python scripts"
readme = "ReadMe.md"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"jmespath>=1.0.1",
@@ -25,6 +25,12 @@ explicit = true
requires = ["hatchling"]
build-backend = "hatchling.build"
[dependency-groups]
dev = [
"pytest>=8.4.1",
"pytest-cov>=6.2.1",
]
# MARK: Python linting
[tool.pyright]
typeCheckingMode = "strict"

View File

@@ -1,120 +0,0 @@
"""
A log handler wrapper
"""
import logging.handlers
import logging
from pathlib import Path
from typing import Mapping
class Log:
"""
logger setup
"""
EXCEPTION: int = 60
def __init__(
self,
log_path: Path,
log_name: str,
log_level_console: str = 'WARNING',
log_level_file: str = 'DEBUG',
add_start_info: bool = True
):
logging.addLevelName(Log.EXCEPTION, 'EXCEPTION')
if not log_name.endswith('.log'):
log_path = log_path.with_suffix('.log')
# overall logger settings
self.logger = logging.getLogger(log_name)
# set maximum logging level for all logging output
self.logger.setLevel(logging.DEBUG)
# console logger
self.__console_handler(log_level_console)
# file logger
self.__file_handler(log_level_file, log_path)
# if requests set a start log
if add_start_info is True:
self.break_line('START')
def __filter_exceptions(self, record: logging.LogRecord) -> bool:
return record.levelname != "EXCEPTION"
def __console_handler(self, log_level_console: str = 'WARNING'):
# console logger
if not isinstance(getattr(logging, log_level_console.upper(), None), int):
log_level_console = 'WARNING'
console_handler = logging.StreamHandler()
formatter_console = logging.Formatter(
(
'[%(asctime)s.%(msecs)03d] '
'[%(filename)s:%(funcName)s:%(lineno)d] '
'<%(levelname)s> '
'%(message)s'
),
datefmt="%Y-%m-%d %H:%M:%S",
)
console_handler.setLevel(log_level_console)
# do not show exceptions logs on console
console_handler.addFilter(self.__filter_exceptions)
console_handler.setFormatter(formatter_console)
self.logger.addHandler(console_handler)
def __file_handler(self, log_level_file: str, log_path: Path) -> None:
# file logger
if not isinstance(getattr(logging, log_level_file.upper(), None), int):
log_level_file = 'DEBUG'
file_handler = logging.handlers.TimedRotatingFileHandler(
filename=log_path,
encoding="utf-8",
when="D",
interval=1
)
formatter_file_handler = logging.Formatter(
(
'[%(asctime)s.%(msecs)03d] '
'[%(pathname)s:%(funcName)s:%(lineno)d] '
'[%(name)s:%(process)d] '
'<%(levelname)s> '
'%(message)s'
),
datefmt="%Y-%m-%dT%H:%M:%S",
)
file_handler.setLevel(log_level_file)
file_handler.setFormatter(formatter_file_handler)
self.logger.addHandler(file_handler)
def break_line(self, info: str = "BREAK"):
"""
add a break line as info level
Keyword Arguments:
info {str} -- _description_ (default: {"BREAK"})
"""
self.logger.info("[%s] ================================>", info)
def exception(self, msg: object, *args: object, extra: Mapping[str, object] | None = None) -> None:
"""
log on exceotion level
Args:
msg (object): _description_
*args (object): arguments for msg
extra: Mapping[str, object] | None: extra arguments for the formatting if needed
"""
self.logger.log(Log.EXCEPTION, msg, *args, exc_info=True, extra=extra)
def validate_log_level(self, log_level: str) -> bool:
"""
if the log level is invalid, will erturn false
Args:
log_level (str): _description_
Returns:
bool: _description_
"""
return isinstance(getattr(logging, log_level.upper(), None), int)
# __END__

View File

@@ -89,3 +89,5 @@ class CsvWriter:
csv_row[value] = line[key]
self.csv_file_writer.writerow(csv_row)
return True
# __END__

View File

@@ -110,5 +110,4 @@ class Timer:
"""
return self._run_time
# __END__

View File

@@ -42,3 +42,5 @@ def file_name_crc(file_path: Path, add_parent_folder: bool = False) -> str:
return str(Path(file_path.parent.name).joinpath(file_path.name))
else:
return file_path.name
# __END__

View File

@@ -126,3 +126,5 @@ def value_lookup(haystack: dict[str, str], value: str, raise_on_many: bool = Fal
if raise_on_many is True and len(keys) > 1:
raise ValueError("More than one element found with the same name")
return keys[0]
# __END__

View File

@@ -0,0 +1,37 @@
"""
Dict helpers
"""
def mask(
data_set: dict[str, str],
mask_keys: list[str] | None = None,
mask_str: str = "***",
skip: bool = False
) -> dict[str, str]:
"""
mask data for output
Checks if mask_keys list exist in any key in the data set either from the start or at the end
Arguments:
data_set {dict[str, str]} -- _description_
Keyword Arguments:
mask_keys {list[str] | None} -- _description_ (default: {None})
mask_str {str} -- _description_ (default: {"***"})
skip {bool} -- _description_ (default: {False})
Returns:
dict[str, str] -- _description_
"""
if skip is True:
return data_set
if mask_keys is None:
mask_keys = ["password", "secret"]
return {
key: mask_str
if any(key.startswith(mask_key) or key.endswith(mask_key) for mask_key in mask_keys) else value
for key, value in data_set.items()
}
# __END__

View File

@@ -35,3 +35,5 @@ def dict_hash_crc(data: dict[Any, Any] | list[Any]) -> str:
return hashlib.sha256(
json.dumps(data, sort_keys=True, ensure_ascii=True).encode('utf-8')
).hexdigest()
# __END__

View File

@@ -59,3 +59,5 @@ def build_dict(
dict[str, Any | list[Any] | dict[Any, Any]],
delete_keys_from_set(any_dict, ignore_entries)
)
# __END__

View File

@@ -0,0 +1,340 @@
"""
A log handler wrapper
if log_settings['log_queue'] is set to multiprocessing.Queue it will launch with listeners
attach "init_worker_logging" with the set log_queue
"""
import re
import logging.handlers
import logging
from pathlib import Path
from typing import Mapping, TextIO, TypedDict, Any, TYPE_CHECKING, cast
from corelibs.string_handling.text_colors import Colors
if TYPE_CHECKING:
from multiprocessing import Queue
class LogSettings(TypedDict):
"""
log settings
Arguments:
TypedDict {_type_} -- _description_
"""
log_level_console: str
log_level_file: str
console_enabled: bool
console_color_output_enabled: bool
add_start_info: bool
add_end_info: bool
log_queue: 'Queue[str] | None'
class CustomConsoleFormatter(logging.Formatter):
"""
Custom formatter with colors for console output
"""
COLORS = {
"DEBUG": Colors.cyan,
"INFO": Colors.green,
"WARNING": Colors.yellow,
"ERROR": Colors.red,
"CRITICAL": Colors.magenta,
"EXCEPTION": Colors.magenta_bold,
}
def format(self, record: logging.LogRecord) -> str:
"""
set the color highlight
Arguments:
record {logging.LogRecord} -- _description_
Returns:
str -- _description_
"""
# Add color to levelname for console output
reset = Colors.reset
color = self.COLORS.get(record.levelname, reset)
# only highlight level for basic
if record.levelname in ['DEBUG', 'INFO']:
record.levelname = f"{color}{record.levelname}{reset}"
return super().format(record)
# highlight whole line
message = super().format(record)
return f"{color}{message}{reset}"
class Log:
"""
logger setup
"""
# exception level
EXCEPTION: int = 60
# spacer lenght characters and the character
SPACER_CHAR: str = '='
SPACER_LENGTH: int = 32
# default logging level
DEFAULT_LOG_LEVEL: str = 'WARNING'
# default settings
DEFAULT_LOG_SETTINGS: LogSettings = {
"log_level_console": "WARNING",
"log_level_file": "DEBUG",
"console_enabled": True,
"console_color_output_enabled": True,
"add_start_info": True,
"add_end_info": False,
"log_queue": None,
}
def __init__(
self,
log_path: Path,
log_name: str,
log_settings: dict[str, 'str | bool | None | Queue[str]'] | LogSettings | None = None,
):
# add new level for EXCEPTION
logging.addLevelName(Log.EXCEPTION, 'EXCEPTION')
# parse the logging settings
self.log_settings = self.__parse_log_settings(log_settings)
# if path, set log name with .log
# if log name with .log, strip .log for naming
if log_path.is_dir():
__log_file_name = re.sub(r'[^a-zA-Z0-9]', '', log_name)
if not log_name.endswith('.log'):
log_path = log_path.joinpath(Path(__log_file_name).with_suffix('.log'))
else:
log_path = log_path.joinpath(__log_file_name)
elif not log_path.suffix == '.log':
# add .log if the path is a file but without .log
log_path = log_path.with_suffix('.log')
# stip .log from the log name if set
if not log_name.endswith('.log'):
log_name = Path(log_name).stem
# general log name
self.log_name = log_name
self.log_queue: 'Queue[str] | None' = None
self.listener: logging.handlers.QueueListener | None = None
# setup handlers
# NOTE if console with color is set first, some of the color formatting is set
# in the file writer too, for the ones where color is set BEFORE the format
self.handlers: list[logging.StreamHandler[TextIO] | logging.handlers.TimedRotatingFileHandler] = [
# file handler, always
self.__file_handler(self.log_settings['log_level_file'], log_path)
]
if self.log_settings['console_enabled']:
# console
self.handlers.append(self.__console_handler(self.log_settings['log_level_console']))
# init listener if we have a log_queue set
self.__init_listener(self.log_settings['log_queue'])
# overall logger start
self.__init_log(log_name)
# if requests set a start log
if self.log_settings['add_start_info'] is True:
self.break_line('START')
def __del__(self):
"""
Call when class is destroyed, make sure the listender is closed or else we throw a thread error
"""
if self.log_settings['add_end_info']:
self.break_line('END')
self.stop_listener()
def __parse_log_settings(
self,
log_settings: dict[str, 'str | bool | None | Queue[str]'] | LogSettings | None
) -> LogSettings:
# skip with defaul it not set
if log_settings is None:
return self.DEFAULT_LOG_SETTINGS
# check entries
default_log_settings = self.DEFAULT_LOG_SETTINGS
# check log levels
for __log_entry in ['log_level_console', 'log_level_file']:
if log_settings.get(__log_entry) is None:
continue
# if not valid reset to default, if not in default set to WARNING
if not self.validate_log_level(_log_level := log_settings.get(__log_entry, '')):
_log_level = self.DEFAULT_LOG_SETTINGS.get(
__log_entry, self.DEFAULT_LOG_LEVEL
)
default_log_settings[__log_entry] = str(_log_level)
# check bool
for __log_entry in [
"console_enabled",
"console_color_output_enabled",
"add_start_info",
"add_end_info",
]:
if log_settings.get(__log_entry) is None:
continue
if not isinstance(__setting := log_settings.get(__log_entry, ''), bool):
__setting = self.DEFAULT_LOG_SETTINGS.get(__log_entry, True)
default_log_settings[__log_entry] = __setting
# check log queue
__setting = log_settings.get('log_queue', self.DEFAULT_LOG_SETTINGS['log_queue'])
if __setting is not None:
__setting = cast('Queue[str]', __setting)
default_log_settings['log_queue'] = __setting
return default_log_settings
def __filter_exceptions(self, record: logging.LogRecord) -> bool:
return record.levelname != "EXCEPTION"
def __console_handler(self, log_level_console: str = 'WARNING') -> logging.StreamHandler[TextIO]:
# console logger
if not isinstance(getattr(logging, log_level_console.upper(), None), int):
log_level_console = 'WARNING'
console_handler = logging.StreamHandler()
# format layouts
format_string = (
'[%(asctime)s.%(msecs)03d] '
'[%(name)s] '
'[%(filename)s:%(funcName)s:%(lineno)d] '
'<%(levelname)s> '
'%(message)s'
)
format_date = "%Y-%m-%d %H:%M:%S"
# color or not
if self.log_settings['console_color_output_enabled']:
formatter_console = CustomConsoleFormatter(format_string, datefmt=format_date)
else:
formatter_console = logging.Formatter(format_string, datefmt=format_date)
console_handler.setLevel(log_level_console)
# do not show exceptions logs on console
console_handler.addFilter(self.__filter_exceptions)
console_handler.setFormatter(formatter_console)
return console_handler
def __file_handler(self, log_level_file: str, log_path: Path) -> logging.handlers.TimedRotatingFileHandler:
# file logger
if not isinstance(getattr(logging, log_level_file.upper(), None), int):
log_level_file = 'DEBUG'
file_handler = logging.handlers.TimedRotatingFileHandler(
filename=log_path,
encoding="utf-8",
when="D",
interval=1
)
formatter_file_handler = logging.Formatter(
(
# time stamp
'[%(asctime)s.%(msecs)03d] '
# log name
'[%(name)s] '
# filename + pid
'[%(filename)s:%(process)d] '
# path + func + line number
'[%(pathname)s:%(funcName)s:%(lineno)d] '
# error level
'<%(levelname)s> '
# message
'%(message)s'
),
datefmt="%Y-%m-%dT%H:%M:%S",
)
file_handler.setLevel(log_level_file)
file_handler.setFormatter(formatter_file_handler)
return file_handler
def __init_listener(self, log_queue: 'Queue[str] | None' = None):
"""
If we have a Queue option start the logging queue
Keyword Arguments:
log_queue {Queue[str] | None} -- _description_ (default: {None})
"""
if log_queue is None:
return
self.log_queue = log_queue
self.listener = logging.handlers.QueueListener(self.log_queue, *self.handlers)
self.listener.start()
def __init_log(self, log_name: str) -> None:
"""
Initialize the main loggger
"""
queue_handler: logging.handlers.QueueHandler | None = None
if self.log_queue is not None:
queue_handler = logging.handlers.QueueHandler(self.log_queue)
# overall logger settings
self.logger = logging.getLogger(log_name)
# add all the handlers
if queue_handler is None:
for handler in self.handlers:
self.logger.addHandler(handler)
else:
self.logger.addHandler(queue_handler)
# set maximum logging level for all logging output
# log level filtering is done per handler
self.logger.setLevel(logging.DEBUG)
@staticmethod
def init_worker_logging(log_queue: 'Queue[str]') -> logging.Logger:
"""
This initalizes a logger that can be used in pool/thread queue calls
"""
queue_handler = logging.handlers.QueueHandler(log_queue)
# getLogger call MUST be WITHOUT and logger name
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
root_logger.handlers.clear()
root_logger.addHandler(queue_handler)
# for debug only
root_logger.debug('[LOGGER] Init log: %s - %s', log_queue, root_logger.handlers)
return root_logger
def stop_listener(self):
"""
stop the listener
"""
if self.listener is not None:
self.listener.stop()
def break_line(self, info: str = "BREAK"):
"""
add a break line as info level
Keyword Arguments:
info {str} -- _description_ (default: {"BREAK"})
"""
self.logger.info("[%s] %s>", info, self.SPACER_CHAR * self.SPACER_LENGTH)
def exception(self, msg: object, *args: object, extra: Mapping[str, object] | None = None) -> None:
"""
log on exceotion level
Args:
msg (object): _description_
*args (object): arguments for msg
extra: Mapping[str, object] | None: extra arguments for the formatting if needed
"""
self.logger.log(Log.EXCEPTION, msg, *args, exc_info=True, extra=extra)
def validate_log_level(self, log_level: Any) -> bool:
"""
if the log level is invalid, will erturn false
Args:
log_level (Any): _description_
Returns:
bool: _description_
"""
if isinstance(log_level, int):
return any(getattr(logging, attr) == log_level for attr in dir(logging))
elif isinstance(log_level, str):
return isinstance(getattr(logging, log_level.upper(), None), int)
else:
return False
# __END__

View File

@@ -93,3 +93,5 @@ def unlock_run(lock_file: Path) -> None:
lock_file.unlink()
except IOError as e:
raise IOError(f"Cannot remove lock_file: {lock_file}: {e}") from e
# __END__

View File

@@ -60,5 +60,4 @@ def create_time(timestamp: float, timestamp_format: str = "%Y-%m-%d %H:%M:%S") -
"""
return time.strftime(timestamp_format, time.localtime(timestamp))
# __END__

View File

@@ -36,3 +36,5 @@ def sha1_short(string: str) -> str:
str -- _description_
"""
return hashlib.sha1(string.encode('utf-8')).hexdigest()[:9]
# __END__

View File

@@ -2,16 +2,19 @@
String helpers
"""
from decimal import Decimal, getcontext
from textwrap import shorten
def shorten_string(string: str, length: int, hard_shorten: bool = False, placeholder: str = " [~]") -> str:
def shorten_string(
string: str | int | float, length: int, hard_shorten: bool = False, placeholder: str = " [~]"
) -> str:
"""
check if entry is too long and cut it, but only for console output
Note that if there are no spaces in the string, it will automatically use the hard split mode
Args:
string (str): _description_
string (str | int | float): _description_
length (int): _description_
hard_shorten (bool): if shorte should be done on fixed string lenght. Default: False
placeholder (str): placeholder string. Default: " [~]"
@@ -19,13 +22,19 @@ def shorten_string(string: str, length: int, hard_shorten: bool = False, placeho
Returns:
str: _description_
"""
length = int(length)
string = str(string)
# if placeholder > lenght
if len(string) > length:
if hard_shorten is True or " " not in string:
# hard shorten error
if len(placeholder) > length:
raise ValueError(f"Cannot shorten string: placeholder {placeholder} is too large for max width")
short_string = f"{string[:(length - len(placeholder))]}{placeholder}"
else:
short_string = shorten(string, width=length, placeholder=placeholder)
try:
short_string = shorten(string, width=length, placeholder=placeholder)
except ValueError as e:
raise ValueError(f"Cannot shorten string: {e}") from e
else:
short_string = string
@@ -66,6 +75,9 @@ def format_number(number: float, precision: int = 0) -> str:
format numbers, current trailing zeros does not work
use {:,} or {:,.f} or {:,.<N>f} <N> = number instead of this
The upper limit of the precision depends on the value of the number itself
very large numbers will have no precision at all any more
Arguments:
number {float} -- _description_
@@ -75,12 +87,18 @@ def format_number(number: float, precision: int = 0) -> str:
Returns:
str -- _description_
"""
if precision < 0 and precision > 100:
if precision < 0 or precision > 100:
precision = 0
if precision > 0:
getcontext().prec = precision
# make it a string to avoid mangling
_number = Decimal(str(number))
else:
_number = number
return (
"{:,."
f"{str(precision)}"
"f}"
).format(number)
).format(_number)
# __END__

View File

@@ -0,0 +1,156 @@
"""
Basic ANSI colors
Set colors with print(f"something {Colors.yellow}colorful{Colors.end})
bold + underline + color combinations are possible.
"""
class Colors:
"""
ANSI colors defined
"""
# General sets, these should not be accessd
__BOLD = '\033[1m'
__UNDERLINE = '\033[4m'
__END = '\033[0m'
__RESET = '\033[0m'
# Define ANSI color codes as class attributes
__BLACK = "\033[30m"
__RED = "\033[31m"
__GREEN = "\033[32m"
__YELLOW = "\033[33m"
__BLUE = "\033[34m"
__MAGENTA = "\033[35m"
__CYAN = "\033[36m"
__WHITE = "\033[37m"
# Define bold/bright versions of the colors
__BLACK_BOLD = "\033[1;30m"
__RED_BOLD = "\033[1;31m"
__GREEN_BOLD = "\033[1;32m"
__YELLOW_BOLD = "\033[1;33m"
__BLUE_BOLD = "\033[1;34m"
__MAGENTA_BOLD = "\033[1;35m"
__CYAN_BOLD = "\033[1;36m"
__WHITE_BOLD = "\033[1;37m"
# BRIGHT, alternative
__BLACK_BRIGHT = '\033[90m'
__RED_BRIGHT = '\033[91m'
__GREEN_BRIGHT = '\033[92m'
__YELLOW_BRIGHT = '\033[93m'
__BLUE_BRIGHT = '\033[94m'
__MAGENTA_BRIGHT = '\033[95m'
__CYAN_BRIGHT = '\033[96m'
__WHITE_BRIGHT = '\033[97m'
# set access vars
bold = __BOLD
underline = __UNDERLINE
end = __END
reset = __RESET
# normal
black = __BLACK
red = __RED
green = __GREEN
yellow = __YELLOW
blue = __BLUE
magenta = __MAGENTA
cyan = __CYAN
white = __WHITE
# bold
black_bold = __BLACK_BOLD
red_bold = __RED_BOLD
green_bold = __GREEN_BOLD
yellow_bold = __YELLOW_BOLD
blue_bold = __BLUE_BOLD
magenta_bold = __MAGENTA_BOLD
cyan_bold = __CYAN_BOLD
white_bold = __WHITE_BOLD
# bright
black_bright = __BLACK_BRIGHT
red_bright = __RED_BRIGHT
green_bright = __GREEN_BRIGHT
yellow_bright = __YELLOW_BRIGHT
blue_bright = __BLUE_BRIGHT
magenta_bright = __MAGENTA_BRIGHT
cyan_bright = __CYAN_BRIGHT
white_bright = __WHITE_BRIGHT
@staticmethod
def disable():
"""
No colors
"""
Colors.bold = ''
Colors.underline = ''
Colors.end = ''
Colors.reset = ''
# normal
Colors.black = ''
Colors.red = ''
Colors.green = ''
Colors.yellow = ''
Colors.blue = ''
Colors.magenta = ''
Colors.cyan = ''
Colors.white = ''
# bold/bright
Colors.black_bold = ''
Colors.red_bold = ''
Colors.green_bold = ''
Colors.yellow_bold = ''
Colors.blue_bold = ''
Colors.magenta_bold = ''
Colors.cyan_bold = ''
Colors.white_bold = ''
# bold/bright alt
Colors.black_bright = ''
Colors.red_bright = ''
Colors.green_bright = ''
Colors.yellow_bright = ''
Colors.blue_bright = ''
Colors.magenta_bright = ''
Colors.cyan_bright = ''
Colors.white_bright = ''
@staticmethod
def reset_colors():
"""
reset colors to the original ones
"""
# set access vars
Colors.bold = Colors.__BOLD
Colors.underline = Colors.__UNDERLINE
Colors.end = Colors.__END
Colors.reset = Colors.__RESET
# normal
Colors.black = Colors.__BLACK
Colors.red = Colors.__RED
Colors.green = Colors.__GREEN
Colors.yellow = Colors.__YELLOW
Colors.blue = Colors.__BLUE
Colors.magenta = Colors.__MAGENTA
Colors.cyan = Colors.__CYAN
Colors.white = Colors.__WHITE
# bold
Colors.black_bold = Colors.__BLACK_BOLD
Colors.red_bold = Colors.__RED_BOLD
Colors.green_bold = Colors.__GREEN_BOLD
Colors.yellow_bold = Colors.__YELLOW_BOLD
Colors.blue_bold = Colors.__BLUE_BOLD
Colors.magenta_bold = Colors.__MAGENTA_BOLD
Colors.cyan_bold = Colors.__CYAN_BOLD
Colors.white_bold = Colors.__WHITE_BOLD
# bright
Colors.black_bright = Colors.__BLACK_BRIGHT
Colors.red_bright = Colors.__RED_BRIGHT
Colors.green_bright = Colors.__GREEN_BRIGHT
Colors.yellow_bright = Colors.__YELLOW_BRIGHT
Colors.blue_bright = Colors.__BLUE_BRIGHT
Colors.magenta_bright = Colors.__MAGENTA_BRIGHT
Colors.cyan_bright = Colors.__CYAN_BRIGHT
Colors.white_bright = Colors.__WHITE_BRIGHT
# __END__

View File

@@ -0,0 +1,26 @@
"""
Current timestamp strings and time zones
"""
from datetime import datetime
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
class TimestampStrings:
"""
set default time stamps
"""
TIME_ZONE: str = 'Asia/Tokyo'
def __init__(self, time_zone: str | None = None):
self.timestamp_now = datetime.now()
self.time_zone = time_zone if time_zone is not None else self.TIME_ZONE
try:
self.timestamp_now_tz = datetime.now(ZoneInfo(self.time_zone))
except ZoneInfoNotFoundError as e:
raise ValueError(f'Zone could not be loaded [{self.time_zone}]: {e}') from e
self.today = self.timestamp_now.strftime('%Y-%m-%d')
self.timestamp = self.timestamp_now.strftime("%Y-%m-%d %H:%M:%S")
self.timestamp_tz = self.timestamp_now_tz.strftime("%Y-%m-%d %H:%M:%S %Z")
self.timestamp_file = self.timestamp_now.strftime("%Y-%m-%d_%H%M%S")

View File

@@ -0,0 +1,48 @@
"""
Log logging_handling.log testing
"""
# import atexit
from pathlib import Path
from multiprocessing import Queue
# this is for testing only
from queue_logger.log_queue import QueueLogger
from corelibs.logging_handling.log import Log
def main():
"""
Log testing
"""
script_path: Path = Path(__file__).resolve().parent
log = Log(
log_path=script_path.joinpath('log', 'test.log'),
log_name="Test Log",
log_settings={
"log_level_console": 'DEBUG',
"log_level_file": 'DEBUG',
# "console_color_output_enabled": False,
}
)
log.logger.debug('Debug test: %s', log.logger.name)
log.logger.info('Info test: %s', log.logger.name)
log.logger.warning('Warning test: %s', log.logger.name)
log.logger.error('Error test: %s', log.logger.name)
log.logger.critical('Critical test: %s', log.logger.name)
log.exception('Exception test: %s', log.logger.name)
log_queue: 'Queue[str]' = Queue()
log_q = QueueLogger(
log_file=script_path.joinpath('log', 'test_queue.log'),
log_name="Test Log Queue",
log_queue=log_queue
)
log_q.mlog.info('Log test: %s', log.logger.name)
# log_q.stop_listener()
if __name__ == "__main__":
main()
# __END__

View File

@@ -0,0 +1,2 @@
*
!.gitignore

View File

@@ -0,0 +1,75 @@
"""
Pool Queue log handling
Thread Queue log handling
"""
import random
import time
from multiprocessing import Queue
import concurrent.futures
import logging
from pathlib import Path
from corelibs.logging_handling.log import Log
def work_function(log_name: str, worker_id: int, data: list[int]) -> int:
"""
simulate worker
Arguments:
worker_id {int} -- _description_
data {list[int]} -- _description_
Returns:
int -- _description_
"""
log = logging.getLogger(f'{log_name}-WorkerFn-{worker_id}')
log.info('Starting worker: %s', worker_id)
time.sleep(random.uniform(1, 3))
result = sum(data) * worker_id
return result
def main():
"""
Queue log tester
"""
print("[START] Queue logger test")
log_queue: 'Queue[str]' = Queue()
script_path: Path = Path(__file__).resolve().parent
log = Log(
log_path=script_path.joinpath('log', 'test.log'),
log_name="Test Log",
log_settings={
"log_level_console": 'INFO',
"log_level_file": 'INFO',
"log_queue": log_queue,
}
)
log.logger.info('Pool Fork logging test')
max_forks = 2
data_sets = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
with concurrent.futures.ProcessPoolExecutor(
max_workers=max_forks,
initializer=Log.init_worker_logging,
initargs=(log_queue,)
) as executor:
log.logger.info('Start workers')
futures = [
executor.submit(work_function, log.log_name, worker_id, data)
for worker_id, data in enumerate(data_sets, 1)
]
log.logger.info('Workders started')
for future in concurrent.futures.as_completed(futures):
log.logger.info('Processing result: %s', future.result())
print(f"Processing result: {future.result()}")
log.logger.info('[END] Queue logger test')
log.stop_listener()
if __name__ == "__main__":
main()
# __END__

View File

@@ -0,0 +1,96 @@
"""
test queue logger interface
NOTE: this has all moved to the default log interface
"""
import logging
import logging.handlers
from pathlib import Path
from multiprocessing import Queue
class QueueLogger:
"""
Queue logger
"""
def __init__(self, log_file: Path, log_name: str, log_queue: 'Queue[str] | None' = None):
self.log_file = log_file
self.log_name = log_name
self.handlers = self.setup_logging()
self.log_queue: 'Queue[str]' = log_queue if log_queue is not None else Queue()
self.listener = logging.handlers.QueueListener(self.log_queue, *self.handlers)
self.listener.start()
self.mlog: logging.Logger = self.main_log(log_name)
def __del__(self):
self.mlog.info("[%s] ================================>", "END")
self.listener.stop()
def setup_logging(self):
"""
setup basic logging
"""
# Create formatters
file_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - [PID:%(process)d] [%(filename)s:%(lineno)d] - %(message)s'
)
console_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Create handlers
file_handler = logging.FileHandler(self.log_file)
file_handler.setFormatter(file_formatter)
file_handler.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler()
console_handler.setFormatter(console_formatter)
console_handler.setLevel(logging.DEBUG)
return [file_handler, console_handler]
def main_log(self, log_name: str) -> logging.Logger:
"""
main logger
Arguments:
log_name {str} -- _description_
Returns:
logging.Logger -- _description_
"""
mlog_handler = logging.handlers.QueueHandler(self.log_queue)
mlog = logging.getLogger(f'{log_name}-MainProcess')
mlog.addHandler(mlog_handler)
mlog.setLevel(logging.DEBUG)
return mlog
@staticmethod
def init_worker_logging(log_queue: 'Queue[str]', log_name: str, ):
"""
Initialize logging for worker processes
"""
# Create QueueHandler
queue_handler = logging.handlers.QueueHandler(log_queue)
# Setup root logger for this process
# NOTE: This must be EMPTY or new SINGLE NEW logger is created, we need one for EACH fork
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
root_logger.handlers.clear()
root_logger.addHandler(queue_handler)
root_logger.info('[LOGGER] Init log: %s - %s', log_queue, log_name)
return root_logger
def stop_listener(self):
"""
stop the listener
"""
self.listener.stop()

View File

Can't render this file because it contains an unexpected character in line 3 and column 165.

View File

Can't render this file because it is too large.

View File

Can't render this file because it contains an unexpected character in line 3 and column 165.

View File

@@ -0,0 +1,88 @@
"""
Test string_handling/string_helpers
"""
import sys
from decimal import Decimal, getcontext
from textwrap import shorten
from corelibs.string_handling.string_helpers import shorten_string, format_number
from corelibs.string_handling.text_colors import Colors
def __sh_shorten_string():
string = "hello"
length = 3
placeholder = " [very long placeholder]"
try:
result = shorten_string(string, length, placeholder=placeholder)
print(f"IN: {string} -> {result}")
except ValueError as e:
print(f"{Colors.red}Failed: {Colors.bold}{e}{Colors.end}")
try:
result = shorten(string, width=length, placeholder=placeholder)
print(f"IN: {string} -> {result}")
except ValueError as e:
print(f"Failed: {e}")
def __sh_format_number():
print(f"Max int: {sys.maxsize}")
print(f"Max float: {sys.float_info.max}")
number = 1234.56
precision = 0
result = format_number(number, precision)
print(f"Format {number} ({precision}) -> {result}")
number = 1234.56
precision = 100
result = format_number(number, precision)
print(f"Format {number} ({precision}) -> {result}")
number = 123400000000000001.56
if number >= sys.maxsize:
print("INT Number too big")
if number >= sys.float_info.max:
print("FLOAT Number too big")
precision = 5
result = format_number(number, precision)
print(f"Format {number} ({precision}) -> {result}")
precision = 100
getcontext().prec = precision
number = Decimal(str(1234.56))
result = f"{number:,.100f}"
print(f"Format {number} ({precision}) -> {result}")
def _sh_colors():
for color in [
"black",
"red",
"green",
"yellow",
"blue",
"magenta",
"cyan",
"white",
]:
for change in ['', '_bold', '_bright']:
_color = f"{color}{change}"
print(f"Color: {getattr(Colors, _color)}{_color}{Colors.end}")
print(f"Underline: {Colors.underline}UNDERLINE{Colors.reset}")
print(f"Bold: {Colors.bold}BOLD{Colors.reset}")
print(f"Underline/Yellow: {Colors.underline}{Colors.yellow}UNDERLINE YELLOW{Colors.reset}")
print(f"Underline/Yellow/Bold: {Colors.underline}{Colors.bold}{Colors.yellow}UNDERLINE YELLOW BOLD{Colors.reset}")
def main():
"""
Test: corelibs.string_handling.string_helpers
"""
__sh_shorten_string()
__sh_format_number()
_sh_colors()
if __name__ == "__main__":
main()
# __END__

View File

@@ -0,0 +1,23 @@
#!/usr/bin/env -S uv run --script
"""
Test for double byte format
"""
from corelibs.string_handling.timestamp_strings import TimestampStrings
def main():
ts = TimestampStrings()
print(f"TS: {ts.timestamp_now}")
try:
ts = TimestampStrings("invalid")
except ValueError as e:
print(f"Value error: {e}")
if __name__ == "__main__":
main()
# __END__

0
tests/unit/__init__.py Normal file
View File

View File

@@ -0,0 +1,237 @@
"""
PyTest: string_handling/string_helpers
"""
import pytest
from textwrap import shorten
from corelibs.string_handling.string_helpers import shorten_string, left_fill, format_number
class TestShortenString:
"""Tests for shorten_string function"""
def test_string_shorter_than_length(self):
"""Test that strings shorter than length are returned unchanged"""
result = shorten_string("hello", 10)
assert result == "hello"
def test_string_equal_to_length(self):
"""Test that strings equal to length are returned unchanged"""
result = shorten_string("hello", 5)
assert result == "hello"
def test_hard_shorten_true(self):
"""Test hard shortening with default placeholder"""
result = shorten_string("hello world", 8, hard_shorten=True)
assert result == "hell [~]"
def test_hard_shorten_custom_placeholder(self):
"""Test hard shortening with custom placeholder"""
result = shorten_string("hello world", 8, hard_shorten=True, placeholder="...")
assert result == "hello..."
def test_no_spaces_auto_hard_shorten(self):
"""Test that strings without spaces automatically use hard shorten"""
result = shorten_string("helloworld", 8)
assert result == "hell [~]"
def test_soft_shorten_with_spaces(self):
"""Test soft shortening using textwrap.shorten"""
result = shorten_string("hello world test", 12)
# Should use textwrap.shorten behavior
expected = shorten("hello world test", width=12, placeholder=" [~]")
assert result == expected
def test_placeholder_too_large_hard_shorten(self):
"""Test error when placeholder is larger than allowed length"""
with pytest.raises(ValueError, match="Cannot shorten string: placeholder .* is too large for max width"):
shorten_string("hello", 3, hard_shorten=True, placeholder=" [~]")
def test_placeholder_too_large_no_spaces(self):
"""Test error when placeholder is larger than allowed length for string without spaces"""
with pytest.raises(ValueError, match="Cannot shorten string: placeholder .* is too large for max width"):
shorten_string("hello", 3, placeholder=" [~]")
def test_textwrap_shorten_error(self):
"""Test handling of textwrap.shorten ValueError"""
# This might be tricky to trigger, but we can mock it
with pytest.raises(ValueError, match="Cannot shorten string:"):
# Very short length that might cause textwrap.shorten to fail
shorten_string("hello world", 1, hard_shorten=False)
def test_type_conversion(self):
"""Test that inputs are converted to proper types"""
result = shorten_string(12345, 8, hard_shorten=True)
assert result == "12345"
def test_empty_string(self):
"""Test with empty string"""
result = shorten_string("", 5)
assert result == ""
def test_zero_length(self):
"""Test with zero length"""
with pytest.raises(ValueError):
shorten_string("hello", 0, hard_shorten=True)
class TestLeftFill:
"""Tests for left_fill function"""
def test_basic_left_fill(self):
"""Test basic left filling with spaces"""
result = left_fill("hello", 10)
assert result == " hello"
assert len(result) == 10
def test_custom_fill_character(self):
"""Test left filling with custom character"""
result = left_fill("hello", 10, "0")
assert result == "00000hello"
def test_string_longer_than_width(self):
"""Test when string is longer than width"""
result = left_fill("hello world", 5)
assert result == "hello world" # Should return original string
def test_string_equal_to_width(self):
"""Test when string equals width"""
result = left_fill("hello", 5)
assert result == "hello"
def test_negative_width(self):
"""Test with negative width"""
result = left_fill("hello", -5)
assert result == "hello" # Should use string length
def test_zero_width(self):
"""Test with zero width"""
result = left_fill("hello", 0)
assert result == "hello" # Should return original string
def test_invalid_fill_character(self):
"""Test with invalid fill character (not single char)"""
result = left_fill("hello", 10, "abc")
assert result == " hello" # Should default to space
def test_empty_fill_character(self):
"""Test with empty fill character"""
result = left_fill("hello", 10, "")
assert result == " hello" # Should default to space
def test_empty_string(self):
"""Test with empty string"""
result = left_fill("", 5)
assert result == " "
class TestFormatNumber:
"""Tests for format_number function"""
def test_integer_default_precision(self):
"""Test formatting integer with default precision"""
result = format_number(1234)
assert result == "1,234"
def test_float_default_precision(self):
"""Test formatting float with default precision"""
result = format_number(1234.56)
assert result == "1,235" # Should round to nearest integer
def test_with_precision(self):
"""Test formatting with specified precision"""
result = format_number(1234.5678, 2)
assert result == "1,234.57"
def test_large_number(self):
"""Test formatting large number"""
result = format_number(1234567.89, 2)
assert result == "1,234,567.89"
def test_zero(self):
"""Test formatting zero"""
result = format_number(0)
assert result == "0"
def test_negative_number(self):
"""Test formatting negative number"""
result = format_number(-1234.56, 2)
assert result == "-1,234.56"
def test_negative_precision(self):
"""Test with negative precision (should default to 0)"""
result = format_number(1234.56, -1)
assert result == "1,235"
def test_excessive_precision(self):
"""Test with precision > 100 (should default to 0)"""
result = format_number(1234.56, 101)
assert result == "1,235"
def test_precision_boundary_values(self):
"""Test precision boundary values"""
# Test precision = 0 (should work)
result = format_number(1234.56, 0)
assert result == "1,235"
# Test precision = 100 (should work)
result = format_number(1234.56, 100)
assert "1,234.56" in result # Will have many trailing zeros
def test_small_decimal(self):
"""Test formatting small decimal number"""
result = format_number(0.123456, 4)
assert result == "0.1235"
def test_very_small_number(self):
"""Test formatting very small number"""
result = format_number(0.001, 3)
assert result == "0.001"
# Additional integration tests
class TestIntegration:
"""Integration tests combining functions"""
def test_format_and_fill(self):
"""Test formatting a number then left filling"""
formatted = format_number(1234.56, 2)
result = left_fill(formatted, 15)
assert result.endswith("1,234.56")
assert len(result) == 15
def test_format_and_shorten(self):
"""Test formatting a large number then shortening"""
formatted = format_number(123456789.123, 3)
result = shorten_string(formatted, 10)
assert len(result) <= 10
# Fixtures for parameterized tests
@pytest.mark.parametrize("input_str,length,expected", [
("hello", 10, "hello"),
("hello world", 5, "h [~]"),
("test", 4, "test"),
("", 5, ""),
])
def test_shorten_string_parametrized(input_str: str, length: int, expected: str):
"""Parametrized test for shorten_string"""
result = shorten_string(input_str, length, hard_shorten=True)
if expected.endswith(" [~]"):
assert result.endswith(" [~]")
assert len(result) == length
else:
assert result == expected
@pytest.mark.parametrize("number,precision,expected", [
(1000, 0, "1,000"),
(1234.56, 2, "1,234.56"),
(0, 1, "0.0"),
(-500, 0, "-500"),
])
def test_format_number_parametrized(number: float | int, precision: int, expected: str):
"""Parametrized test for format_number"""
assert format_number(number, precision) == expected
# __END__

View File

@@ -0,0 +1,157 @@
"""
PyTest: string_handling/timestamp_strings
"""
from datetime import datetime
from unittest.mock import patch, MagicMock
from zoneinfo import ZoneInfo
import pytest
# Assuming the class is in a file called timestamp_strings.py
from corelibs.string_handling.timestamp_strings import TimestampStrings
class TestTimestampStrings:
"""Test suite for TimestampStrings class"""
def test_default_initialization(self):
"""Test initialization with default timezone"""
with patch('corelibs.string_handling.timestamp_strings.datetime') as mock_datetime:
mock_now = datetime(2023, 12, 25, 15, 30, 45)
mock_datetime.now.return_value = mock_now
ts = TimestampStrings()
assert ts.time_zone == 'Asia/Tokyo'
assert ts.timestamp_now == mock_now
assert ts.today == '2023-12-25'
assert ts.timestamp == '2023-12-25 15:30:45'
assert ts.timestamp_file == '2023-12-25_153045'
def test_custom_timezone_initialization(self):
"""Test initialization with custom timezone"""
custom_tz = 'America/New_York'
with patch('corelibs.string_handling.timestamp_strings.datetime') as mock_datetime:
mock_now = datetime(2023, 12, 25, 15, 30, 45)
mock_datetime.now.return_value = mock_now
ts = TimestampStrings(time_zone=custom_tz)
assert ts.time_zone == custom_tz
assert ts.timestamp_now == mock_now
def test_invalid_timezone_raises_error(self):
"""Test that invalid timezone raises ValueError"""
invalid_tz = 'Invalid/Timezone'
with pytest.raises(ValueError) as exc_info:
TimestampStrings(time_zone=invalid_tz)
assert 'Zone could not be loaded [Invalid/Timezone]' in str(exc_info.value)
def test_timestamp_formats(self):
"""Test various timestamp format outputs"""
with patch('corelibs.string_handling.timestamp_strings.datetime') as mock_datetime:
# Mock both datetime.now() calls
mock_now = datetime(2023, 12, 25, 9, 5, 3)
mock_now_tz = datetime(2023, 12, 25, 23, 5, 3, tzinfo=ZoneInfo('Asia/Tokyo'))
mock_datetime.now.side_effect = [mock_now, mock_now_tz]
ts = TimestampStrings()
assert ts.today == '2023-12-25'
assert ts.timestamp == '2023-12-25 09:05:03'
assert ts.timestamp_file == '2023-12-25_090503'
assert 'JST' in ts.timestamp_tz or 'Asia/Tokyo' in ts.timestamp_tz
def test_different_timezones_produce_different_results(self):
"""Test that different timezones produce different timestamp_tz values"""
with patch('corelibs.string_handling.timestamp_strings.datetime') as mock_datetime:
mock_now = datetime(2023, 12, 25, 12, 0, 0)
mock_datetime.now.return_value = mock_now
# Create instances with different timezones
ts_tokyo = TimestampStrings(time_zone='Asia/Tokyo')
ts_ny = TimestampStrings(time_zone='America/New_York')
# The timezone-aware timestamps should be different
assert ts_tokyo.time_zone != ts_ny.time_zone
# Note: The actual timestamp_tz values will depend on the mocked datetime
def test_class_default_timezone(self):
"""Test that class default timezone is correctly set"""
assert TimestampStrings.TIME_ZONE == 'Asia/Tokyo'
def test_none_timezone_uses_default(self):
"""Test that passing None for timezone uses class default"""
with patch('corelibs.string_handling.timestamp_strings.datetime') as mock_datetime:
mock_now = datetime(2023, 12, 25, 15, 30, 45)
mock_datetime.now.return_value = mock_now
ts = TimestampStrings(time_zone=None)
assert ts.time_zone == 'Asia/Tokyo'
def test_timestamp_file_format_no_colons(self):
"""Test that timestamp_file format doesn't contain colons (safe for filenames)"""
with patch('corelibs.string_handling.timestamp_strings.datetime') as mock_datetime:
mock_now = datetime(2023, 12, 25, 15, 30, 45)
mock_datetime.now.return_value = mock_now
ts = TimestampStrings()
assert ':' not in ts.timestamp_file
assert ' ' not in ts.timestamp_file
assert ts.timestamp_file == '2023-12-25_153045'
def test_multiple_instances_independent(self):
"""Test that multiple instances don't interfere with each other"""
with patch('corelibs.string_handling.timestamp_strings.datetime') as mock_datetime:
mock_now = datetime(2023, 12, 25, 15, 30, 45)
mock_datetime.now.return_value = mock_now
ts1 = TimestampStrings(time_zone='Asia/Tokyo')
ts2 = TimestampStrings(time_zone='Europe/London')
assert ts1.time_zone == 'Asia/Tokyo'
assert ts2.time_zone == 'Europe/London'
assert ts1.time_zone != ts2.time_zone
@patch('corelibs.string_handling.timestamp_strings.ZoneInfo')
def test_zoneinfo_called_correctly(self, mock_zoneinfo: MagicMock):
"""Test that ZoneInfo is called with correct timezone"""
with patch('corelibs.string_handling.timestamp_strings.datetime') as mock_datetime:
mock_now = datetime(2023, 12, 25, 15, 30, 45)
mock_datetime.now.return_value = mock_now
custom_tz = 'Europe/Paris'
ts = TimestampStrings(time_zone=custom_tz)
assert ts.time_zone == custom_tz
mock_zoneinfo.assert_called_with(custom_tz)
def test_edge_case_midnight(self):
"""Test timestamp formatting at midnight"""
with patch('corelibs.string_handling.timestamp_strings.datetime') as mock_datetime:
mock_now = datetime(2023, 12, 25, 0, 0, 0)
mock_datetime.now.return_value = mock_now
ts = TimestampStrings()
assert ts.timestamp == '2023-12-25 00:00:00'
assert ts.timestamp_file == '2023-12-25_000000'
def test_edge_case_new_year(self):
"""Test timestamp formatting at new year"""
with patch('corelibs.string_handling.timestamp_strings.datetime') as mock_datetime:
mock_now = datetime(2024, 1, 1, 0, 0, 0)
mock_datetime.now.return_value = mock_now
ts = TimestampStrings()
assert ts.today == '2024-01-01'
assert ts.timestamp == '2024-01-01 00:00:00'
# __END__

120
uv.lock generated
View File

@@ -33,9 +33,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/20/94/c5790835a017658cbfabd07f3bfb549140c3ac458cfc196323996b10095a/charset_normalizer-3.4.2-py3-none-any.whl", hash = "sha256:7f56930ab0abd1c45cd15be65cc741c28b1c9a34876ce8c17a2fa107810c0af0", size = 52626, upload-time = "2025-05-02T08:34:40.053Z" },
]
[[package]]
name = "colorama"
version = "0.4.6"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d8/53/6f443c9a4a8358a93a6792e2acffb9d9d5cb0a5cfd8802644b7b1c9a02e4/colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44", size = 27697, upload-time = "2022-10-25T02:36:22.414Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" },
]
[[package]]
name = "corelibs"
version = "0.3.1"
version = "0.9.0"
source = { editable = "." }
dependencies = [
{ name = "jmespath" },
@@ -43,6 +52,12 @@ dependencies = [
{ name = "requests" },
]
[package.dev-dependencies]
dev = [
{ name = "pytest" },
{ name = "pytest-cov" },
]
[package.metadata]
requires-dist = [
{ name = "jmespath", specifier = ">=1.0.1" },
@@ -50,6 +65,43 @@ requires-dist = [
{ name = "requests", specifier = ">=2.32.4" },
]
[package.metadata.requires-dev]
dev = [
{ name = "pytest", specifier = ">=8.4.1" },
{ name = "pytest-cov", specifier = ">=6.2.1" },
]
[[package]]
name = "coverage"
version = "7.9.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/04/b7/c0465ca253df10a9e8dae0692a4ae6e9726d245390aaef92360e1d6d3832/coverage-7.9.2.tar.gz", hash = "sha256:997024fa51e3290264ffd7492ec97d0690293ccd2b45a6cd7d82d945a4a80c8b", size = 813556, upload-time = "2025-07-03T10:54:15.101Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/94/9d/7a8edf7acbcaa5e5c489a646226bed9591ee1c5e6a84733c0140e9ce1ae1/coverage-7.9.2-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:985abe7f242e0d7bba228ab01070fde1d6c8fa12f142e43debe9ed1dde686038", size = 212367, upload-time = "2025-07-03T10:53:25.811Z" },
{ url = "https://files.pythonhosted.org/packages/e8/9e/5cd6f130150712301f7e40fb5865c1bc27b97689ec57297e568d972eec3c/coverage-7.9.2-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:82c3939264a76d44fde7f213924021ed31f55ef28111a19649fec90c0f109e6d", size = 212632, upload-time = "2025-07-03T10:53:27.075Z" },
{ url = "https://files.pythonhosted.org/packages/a8/de/6287a2c2036f9fd991c61cefa8c64e57390e30c894ad3aa52fac4c1e14a8/coverage-7.9.2-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ae5d563e970dbe04382f736ec214ef48103d1b875967c89d83c6e3f21706d5b3", size = 245793, upload-time = "2025-07-03T10:53:28.408Z" },
{ url = "https://files.pythonhosted.org/packages/06/cc/9b5a9961d8160e3cb0b558c71f8051fe08aa2dd4b502ee937225da564ed1/coverage-7.9.2-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:bdd612e59baed2a93c8843c9a7cb902260f181370f1d772f4842987535071d14", size = 243006, upload-time = "2025-07-03T10:53:29.754Z" },
{ url = "https://files.pythonhosted.org/packages/49/d9/4616b787d9f597d6443f5588619c1c9f659e1f5fc9eebf63699eb6d34b78/coverage-7.9.2-cp313-cp313-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:256ea87cb2a1ed992bcdfc349d8042dcea1b80436f4ddf6e246d6bee4b5d73b6", size = 244990, upload-time = "2025-07-03T10:53:31.098Z" },
{ url = "https://files.pythonhosted.org/packages/48/83/801cdc10f137b2d02b005a761661649ffa60eb173dcdaeb77f571e4dc192/coverage-7.9.2-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:f44ae036b63c8ea432f610534a2668b0c3aee810e7037ab9d8ff6883de480f5b", size = 245157, upload-time = "2025-07-03T10:53:32.717Z" },
{ url = "https://files.pythonhosted.org/packages/c8/a4/41911ed7e9d3ceb0ffb019e7635468df7499f5cc3edca5f7dfc078e9c5ec/coverage-7.9.2-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:82d76ad87c932935417a19b10cfe7abb15fd3f923cfe47dbdaa74ef4e503752d", size = 243128, upload-time = "2025-07-03T10:53:34.009Z" },
{ url = "https://files.pythonhosted.org/packages/10/41/344543b71d31ac9cb00a664d5d0c9ef134a0fe87cb7d8430003b20fa0b7d/coverage-7.9.2-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:619317bb86de4193debc712b9e59d5cffd91dc1d178627ab2a77b9870deb2868", size = 244511, upload-time = "2025-07-03T10:53:35.434Z" },
{ url = "https://files.pythonhosted.org/packages/d5/81/3b68c77e4812105e2a060f6946ba9e6f898ddcdc0d2bfc8b4b152a9ae522/coverage-7.9.2-cp313-cp313-win32.whl", hash = "sha256:0a07757de9feb1dfafd16ab651e0f628fd7ce551604d1bf23e47e1ddca93f08a", size = 214765, upload-time = "2025-07-03T10:53:36.787Z" },
{ url = "https://files.pythonhosted.org/packages/06/a2/7fac400f6a346bb1a4004eb2a76fbff0e242cd48926a2ce37a22a6a1d917/coverage-7.9.2-cp313-cp313-win_amd64.whl", hash = "sha256:115db3d1f4d3f35f5bb021e270edd85011934ff97c8797216b62f461dd69374b", size = 215536, upload-time = "2025-07-03T10:53:38.188Z" },
{ url = "https://files.pythonhosted.org/packages/08/47/2c6c215452b4f90d87017e61ea0fd9e0486bb734cb515e3de56e2c32075f/coverage-7.9.2-cp313-cp313-win_arm64.whl", hash = "sha256:48f82f889c80af8b2a7bb6e158d95a3fbec6a3453a1004d04e4f3b5945a02694", size = 213943, upload-time = "2025-07-03T10:53:39.492Z" },
{ url = "https://files.pythonhosted.org/packages/a3/46/e211e942b22d6af5e0f323faa8a9bc7c447a1cf1923b64c47523f36ed488/coverage-7.9.2-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:55a28954545f9d2f96870b40f6c3386a59ba8ed50caf2d949676dac3ecab99f5", size = 213088, upload-time = "2025-07-03T10:53:40.874Z" },
{ url = "https://files.pythonhosted.org/packages/d2/2f/762551f97e124442eccd907bf8b0de54348635b8866a73567eb4e6417acf/coverage-7.9.2-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:cdef6504637731a63c133bb2e6f0f0214e2748495ec15fe42d1e219d1b133f0b", size = 213298, upload-time = "2025-07-03T10:53:42.218Z" },
{ url = "https://files.pythonhosted.org/packages/7a/b7/76d2d132b7baf7360ed69be0bcab968f151fa31abe6d067f0384439d9edb/coverage-7.9.2-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:bcd5ebe66c7a97273d5d2ddd4ad0ed2e706b39630ed4b53e713d360626c3dbb3", size = 256541, upload-time = "2025-07-03T10:53:43.823Z" },
{ url = "https://files.pythonhosted.org/packages/a0/17/392b219837d7ad47d8e5974ce5f8dc3deb9f99a53b3bd4d123602f960c81/coverage-7.9.2-cp313-cp313t-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9303aed20872d7a3c9cb39c5d2b9bdbe44e3a9a1aecb52920f7e7495410dfab8", size = 252761, upload-time = "2025-07-03T10:53:45.19Z" },
{ url = "https://files.pythonhosted.org/packages/d5/77/4256d3577fe1b0daa8d3836a1ebe68eaa07dd2cbaf20cf5ab1115d6949d4/coverage-7.9.2-cp313-cp313t-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc18ea9e417a04d1920a9a76fe9ebd2f43ca505b81994598482f938d5c315f46", size = 254917, upload-time = "2025-07-03T10:53:46.931Z" },
{ url = "https://files.pythonhosted.org/packages/53/99/fc1a008eef1805e1ddb123cf17af864743354479ea5129a8f838c433cc2c/coverage-7.9.2-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:6406cff19880aaaadc932152242523e892faff224da29e241ce2fca329866584", size = 256147, upload-time = "2025-07-03T10:53:48.289Z" },
{ url = "https://files.pythonhosted.org/packages/92/c0/f63bf667e18b7f88c2bdb3160870e277c4874ced87e21426128d70aa741f/coverage-7.9.2-cp313-cp313t-musllinux_1_2_i686.whl", hash = "sha256:2d0d4f6ecdf37fcc19c88fec3e2277d5dee740fb51ffdd69b9579b8c31e4232e", size = 254261, upload-time = "2025-07-03T10:53:49.99Z" },
{ url = "https://files.pythonhosted.org/packages/8c/32/37dd1c42ce3016ff8ec9e4b607650d2e34845c0585d3518b2a93b4830c1a/coverage-7.9.2-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:c33624f50cf8de418ab2b4d6ca9eda96dc45b2c4231336bac91454520e8d1fac", size = 255099, upload-time = "2025-07-03T10:53:51.354Z" },
{ url = "https://files.pythonhosted.org/packages/da/2e/af6b86f7c95441ce82f035b3affe1cd147f727bbd92f563be35e2d585683/coverage-7.9.2-cp313-cp313t-win32.whl", hash = "sha256:1df6b76e737c6a92210eebcb2390af59a141f9e9430210595251fbaf02d46926", size = 215440, upload-time = "2025-07-03T10:53:52.808Z" },
{ url = "https://files.pythonhosted.org/packages/4d/bb/8a785d91b308867f6b2e36e41c569b367c00b70c17f54b13ac29bcd2d8c8/coverage-7.9.2-cp313-cp313t-win_amd64.whl", hash = "sha256:f5fd54310b92741ebe00d9c0d1d7b2b27463952c022da6d47c175d246a98d1bd", size = 216537, upload-time = "2025-07-03T10:53:54.273Z" },
{ url = "https://files.pythonhosted.org/packages/1d/a0/a6bffb5e0f41a47279fd45a8f3155bf193f77990ae1c30f9c224b61cacb0/coverage-7.9.2-cp313-cp313t-win_arm64.whl", hash = "sha256:c48c2375287108c887ee87d13b4070a381c6537d30e8487b24ec721bf2a781cb", size = 214398, upload-time = "2025-07-03T10:53:56.715Z" },
{ url = "https://files.pythonhosted.org/packages/3c/38/bbe2e63902847cf79036ecc75550d0698af31c91c7575352eb25190d0fb3/coverage-7.9.2-py3-none-any.whl", hash = "sha256:e425cd5b00f6fc0ed7cdbd766c70be8baab4b7839e4d4fe5fac48581dd968ea4", size = 204005, upload-time = "2025-07-03T10:54:13.491Z" },
]
[[package]]
name = "idna"
version = "3.10"
@@ -59,6 +111,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/76/c6/c88e154df9c4e1a2a66ccf0005a88dfb2650c1dffb6f5ce603dfbd452ce3/idna-3.10-py3-none-any.whl", hash = "sha256:946d195a0d259cbba61165e88e65941f16e9b36ea6ddb97f00452bae8b1287d3", size = 70442, upload-time = "2024-09-15T18:07:37.964Z" },
]
[[package]]
name = "iniconfig"
version = "2.1.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f2/97/ebf4da567aa6827c909642694d71c9fcf53e5b504f2d96afea02718862f3/iniconfig-2.1.0.tar.gz", hash = "sha256:3abbd2e30b36733fee78f9c7f7308f2d0050e88f0087fd25c2645f63c773e1c7", size = 4793, upload-time = "2025-03-19T20:09:59.721Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2c/e1/e6716421ea10d38022b952c159d5161ca1193197fb744506875fbb87ea7b/iniconfig-2.1.0-py3-none-any.whl", hash = "sha256:9deba5723312380e77435581c6bf4935c94cbfab9b1ed33ef8d238ea168eb760", size = 6050, upload-time = "2025-03-19T20:10:01.071Z" },
]
[[package]]
name = "jmespath"
version = "1.0.1"
@@ -68,6 +129,24 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/31/b4/b9b800c45527aadd64d5b442f9b932b00648617eb5d63d2c7a6587b7cafc/jmespath-1.0.1-py3-none-any.whl", hash = "sha256:02e2e4cc71b5bcab88332eebf907519190dd9e6e82107fa7f83b1003a6252980", size = 20256, upload-time = "2022-06-17T18:00:10.251Z" },
]
[[package]]
name = "packaging"
version = "25.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/a1/d4/1fc4078c65507b51b96ca8f8c3ba19e6a61c8253c72794544580a7b6c24d/packaging-25.0.tar.gz", hash = "sha256:d443872c98d677bf60f6a1f2f8c1cb748e8fe762d2bf9d3148b5599295b0fc4f", size = 165727, upload-time = "2025-04-19T11:48:59.673Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/20/12/38679034af332785aac8774540895e234f4d07f7545804097de4b666afd8/packaging-25.0-py3-none-any.whl", hash = "sha256:29572ef2b1f17581046b3a2227d5c611fb25ec70ca1ba8554b24b0e69331a484", size = 66469, upload-time = "2025-04-19T11:48:57.875Z" },
]
[[package]]
name = "pluggy"
version = "1.6.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f9/e2/3e91f31a7d2b083fe6ef3fa267035b518369d9511ffab804f839851d2779/pluggy-1.6.0.tar.gz", hash = "sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3", size = 69412, upload-time = "2025-05-15T12:30:07.975Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" },
]
[[package]]
name = "psutil"
version = "7.0.0"
@@ -83,6 +162,45 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/50/1b/6921afe68c74868b4c9fa424dad3be35b095e16687989ebbb50ce4fceb7c/psutil-7.0.0-cp37-abi3-win_amd64.whl", hash = "sha256:4cf3d4eb1aa9b348dec30105c55cd9b7d4629285735a102beb4441e38db90553", size = 244885, upload-time = "2025-02-13T21:54:37.486Z" },
]
[[package]]
name = "pygments"
version = "2.19.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631, upload-time = "2025-06-21T13:39:12.283Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" },
]
[[package]]
name = "pytest"
version = "8.4.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
{ name = "iniconfig" },
{ name = "packaging" },
{ name = "pluggy" },
{ name = "pygments" },
]
sdist = { url = "https://files.pythonhosted.org/packages/08/ba/45911d754e8eba3d5a841a5ce61a65a685ff1798421ac054f85aa8747dfb/pytest-8.4.1.tar.gz", hash = "sha256:7c67fd69174877359ed9371ec3af8a3d2b04741818c51e5e99cc1742251fa93c", size = 1517714, upload-time = "2025-06-18T05:48:06.109Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/29/16/c8a903f4c4dffe7a12843191437d7cd8e32751d5de349d45d3fe69544e87/pytest-8.4.1-py3-none-any.whl", hash = "sha256:539c70ba6fcead8e78eebbf1115e8b589e7565830d7d006a8723f19ac8a0afb7", size = 365474, upload-time = "2025-06-18T05:48:03.955Z" },
]
[[package]]
name = "pytest-cov"
version = "6.2.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "coverage" },
{ name = "pluggy" },
{ name = "pytest" },
]
sdist = { url = "https://files.pythonhosted.org/packages/18/99/668cade231f434aaa59bbfbf49469068d2ddd945000621d3d165d2e7dd7b/pytest_cov-6.2.1.tar.gz", hash = "sha256:25cc6cc0a5358204b8108ecedc51a9b57b34cc6b8c967cc2c01a4e00d8a67da2", size = 69432, upload-time = "2025-06-12T10:47:47.684Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/bc/16/4ea354101abb1287856baa4af2732be351c7bee728065aed451b678153fd/pytest_cov-6.2.1-py3-none-any.whl", hash = "sha256:f5bc4c23f42f1cdd23c70b1dab1bbaef4fc505ba950d53e0081d0730dd7e86d5", size = 24644, upload-time = "2025-06-12T10:47:45.932Z" },
]
[[package]]
name = "requests"
version = "2.32.4"