Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bf83c1c394 | ||
|
|
84ce43ab93 | ||
|
|
5e0765ee24 | ||
|
|
6edf9398b7 | ||
|
|
30bf9c1bcb |
@@ -1,7 +1,7 @@
|
||||
# MARK: Project info
|
||||
[project]
|
||||
name = "corelibs"
|
||||
version = "0.27.0"
|
||||
version = "0.29.0"
|
||||
description = "Collection of utils for Python scripts"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.13"
|
||||
@@ -63,3 +63,7 @@ ignore = [
|
||||
[tool.pylint.MASTER]
|
||||
# this is for the tests/etc folders
|
||||
init-hook='import sys; sys.path.append("src/")'
|
||||
[tool.pytest.ini_options]
|
||||
testpaths = [
|
||||
"tests",
|
||||
]
|
||||
|
||||
0
src/corelibs/db_handling/__init__.py
Normal file
0
src/corelibs/db_handling/__init__.py
Normal file
214
src/corelibs/db_handling/sqlite_io.py
Normal file
214
src/corelibs/db_handling/sqlite_io.py
Normal file
@@ -0,0 +1,214 @@
|
||||
"""
|
||||
SQLite DB::IO
|
||||
Will be moved to the CoreLibs
|
||||
also method names are subject to change
|
||||
"""
|
||||
|
||||
# import gc
|
||||
from pathlib import Path
|
||||
from typing import Any, Literal, TYPE_CHECKING
|
||||
import sqlite3
|
||||
from corelibs.debug_handling.debug_helpers import call_stack
|
||||
if TYPE_CHECKING:
|
||||
from corelibs.logging_handling.log import Logger
|
||||
|
||||
|
||||
class SQLiteIO():
|
||||
"""Mini SQLite interface"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
log: 'Logger',
|
||||
db_name: str | Path,
|
||||
autocommit: bool = False,
|
||||
enable_fkey: bool = True,
|
||||
row_factory: str | None = None
|
||||
):
|
||||
self.log = log
|
||||
self.db_name = db_name
|
||||
self.autocommit = autocommit
|
||||
self.enable_fkey = enable_fkey
|
||||
self.row_factory = row_factory
|
||||
self.conn: sqlite3.Connection | None = self.db_connect()
|
||||
|
||||
# def __del__(self):
|
||||
# self.db_close()
|
||||
|
||||
def db_connect(self) -> sqlite3.Connection | None:
|
||||
"""
|
||||
Connect to SQLite database, create if it doesn't exist
|
||||
"""
|
||||
try:
|
||||
# Connect to database (creates if doesn't exist)
|
||||
self.conn = sqlite3.connect(self.db_name, autocommit=self.autocommit)
|
||||
self.conn.setconfig(sqlite3.SQLITE_DBCONFIG_ENABLE_FKEY, True)
|
||||
# self.conn.execute("PRAGMA journal_mode=WAL")
|
||||
# self.log.debug(f"Connected to database: {self.db_name}")
|
||||
|
||||
def dict_factory(cursor: sqlite3.Cursor, row: list[Any]):
|
||||
fields = [column[0] for column in cursor.description]
|
||||
return dict(zip(fields, row))
|
||||
|
||||
match self.row_factory:
|
||||
case 'Row':
|
||||
self.conn.row_factory = sqlite3.Row
|
||||
case 'Dict':
|
||||
self.conn.row_factory = dict_factory
|
||||
case _:
|
||||
self.conn.row_factory = None
|
||||
|
||||
return self.conn
|
||||
except (sqlite3.Error, sqlite3.OperationalError) as e:
|
||||
self.log.error(f"Error connecting to database [{type(e).__name__}] [{self.db_name}]: {e} [{call_stack()}]")
|
||||
self.log.error(f"Error code: {e.sqlite_errorcode if hasattr(e, 'sqlite_errorcode') else 'N/A'}")
|
||||
self.log.error(f"Error name: {e.sqlite_errorname if hasattr(e, 'sqlite_errorname') else 'N/A'}")
|
||||
return None
|
||||
|
||||
def db_close(self):
|
||||
"""close connection"""
|
||||
if self.conn is not None:
|
||||
self.conn.close()
|
||||
self.conn = None
|
||||
|
||||
def db_connected(self) -> bool:
|
||||
"""
|
||||
Return True if db connection is not none
|
||||
|
||||
Returns:
|
||||
bool -- _description_
|
||||
"""
|
||||
return True if self.conn else False
|
||||
|
||||
def __content_exists(self, content_name: str, sql_type: str) -> bool:
|
||||
"""
|
||||
Check if some content name for a certain type exists
|
||||
|
||||
Arguments:
|
||||
content_name {str} -- _description_
|
||||
sql_type {str} -- _description_
|
||||
|
||||
Returns:
|
||||
bool -- _description_
|
||||
"""
|
||||
if self.conn is None:
|
||||
return False
|
||||
try:
|
||||
cursor = self.conn.cursor()
|
||||
cursor.execute("""
|
||||
SELECT name
|
||||
FROM sqlite_master
|
||||
WHERE type = ? AND name = ?
|
||||
""", (sql_type, content_name,))
|
||||
return cursor.fetchone() is not None
|
||||
except sqlite3.Error as e:
|
||||
self.log.error(f"Error checking table [{content_name}/{sql_type}] existence: {e} [{call_stack()}]")
|
||||
return False
|
||||
|
||||
def table_exists(self, table_name: str) -> bool:
|
||||
"""
|
||||
Check if a table exists in the database
|
||||
"""
|
||||
return self.__content_exists(table_name, 'table')
|
||||
|
||||
def trigger_exists(self, trigger_name: str) -> bool:
|
||||
"""
|
||||
Check if a triggere exits
|
||||
"""
|
||||
return self.__content_exists(trigger_name, 'trigger')
|
||||
|
||||
def index_exists(self, index_name: str) -> bool:
|
||||
"""
|
||||
Check if a triggere exits
|
||||
"""
|
||||
return self.__content_exists(index_name, 'index')
|
||||
|
||||
def meta_data_detail(self, table_name: str) -> list[tuple[Any, ...]] | list[dict[str, Any]] | Literal[False]:
|
||||
"""table detail"""
|
||||
query_show_table = """
|
||||
SELECT
|
||||
ti.cid, ti.name, ti.type, ti.'notnull', ti.dflt_value, ti.pk,
|
||||
il_ii.idx_name, il_ii.idx_unique, il_ii.idx_origin, il_ii.idx_partial
|
||||
FROM
|
||||
sqlite_schema AS m,
|
||||
pragma_table_info(m.name) AS ti
|
||||
LEFT JOIN (
|
||||
SELECT
|
||||
il.name AS idx_name, il.'unique' AS idx_unique, il.origin AS idx_origin, il.partial AS idx_partial,
|
||||
ii.cid AS tbl_cid
|
||||
FROM
|
||||
sqlite_schema AS m,
|
||||
pragma_index_list(m.name) AS il,
|
||||
pragma_index_info(il.name) AS ii
|
||||
WHERE m.name = ?1
|
||||
) AS il_ii ON (ti.cid = il_ii.tbl_cid)
|
||||
WHERE
|
||||
m.name = ?1
|
||||
"""
|
||||
return self.execute_query(query_show_table, (table_name,))
|
||||
|
||||
def execute_cursor(
|
||||
self, query: str, params: tuple[Any, ...] | None = None
|
||||
) -> sqlite3.Cursor | Literal[False]:
|
||||
"""execute a cursor, used in execute query or return one and for fetch_row"""
|
||||
if self.conn is None:
|
||||
self.log.warning(f"No connection [{call_stack()}]")
|
||||
return False
|
||||
try:
|
||||
cursor = self.conn.cursor()
|
||||
if params:
|
||||
cursor.execute(query, params)
|
||||
else:
|
||||
cursor.execute(query)
|
||||
return cursor
|
||||
except sqlite3.Error as e:
|
||||
self.log.error(f"Error during executing cursor [{query}:{params}]: {e} [{call_stack()}]")
|
||||
return False
|
||||
|
||||
def execute_query(
|
||||
self, query: str, params: tuple[Any, ...] | None = None
|
||||
) -> list[tuple[Any, ...]] | list[dict[str, Any]] | Literal[False]:
|
||||
"""query execute with or without params, returns result"""
|
||||
if self.conn is None:
|
||||
self.log.warning(f"No connection [{call_stack()}]")
|
||||
return False
|
||||
try:
|
||||
if (cursor := self.execute_cursor(query, params)) is False:
|
||||
return False
|
||||
# fetch before commit because we need to get the RETURN before
|
||||
result = cursor.fetchall()
|
||||
# this is for INSERT/UPDATE/CREATE only
|
||||
self.conn.commit()
|
||||
return result
|
||||
except sqlite3.Error as e:
|
||||
self.log.error(f"Error during executing query [{query}:{params}]: {e} [{call_stack()}]")
|
||||
return False
|
||||
|
||||
def return_one(
|
||||
self, query: str, params: tuple[Any, ...] | None = None
|
||||
) -> tuple[Any, ...] | dict[str, Any] | Literal[False] | None:
|
||||
"""return one row, only for SELECT"""
|
||||
if self.conn is None:
|
||||
self.log.warning(f"No connection [{call_stack()}]")
|
||||
return False
|
||||
try:
|
||||
if (cursor := self.execute_cursor(query, params)) is False:
|
||||
return False
|
||||
return cursor.fetchone()
|
||||
except sqlite3.Error as e:
|
||||
self.log.error(f"Error during return one: {e} [{call_stack()}]")
|
||||
return False
|
||||
|
||||
def fetch_row(
|
||||
self, cursor: sqlite3.Cursor | Literal[False]
|
||||
) -> tuple[Any, ...] | dict[str, Any] | Literal[False] | None:
|
||||
"""read from cursor"""
|
||||
if self.conn is None or cursor is False:
|
||||
self.log.warning(f"No connection [{call_stack()}]")
|
||||
return False
|
||||
try:
|
||||
return cursor.fetchone()
|
||||
except sqlite3.Error as e:
|
||||
self.log.error(f"Error during fetch row: {e} [{call_stack()}]")
|
||||
return False
|
||||
|
||||
# __END__
|
||||
75
src/corelibs/var_handling/enum_base.py
Normal file
75
src/corelibs/var_handling/enum_base.py
Normal file
@@ -0,0 +1,75 @@
|
||||
"""
|
||||
Enum base classes
|
||||
"""
|
||||
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
|
||||
class EnumBase(Enum):
|
||||
"""
|
||||
base for enum
|
||||
lookup_any and from_any will return "EnumBase" and the sub class name
|
||||
run the return again to "from_any" to get a clean value, or cast it
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def lookup_key(cls, enum_key: str):
|
||||
"""Lookup from key side (must be string)"""
|
||||
# if there is a ":", then this is legacy, replace with ___
|
||||
if ":" in enum_key:
|
||||
enum_key = enum_key.replace(':', '___')
|
||||
try:
|
||||
return cls[enum_key.upper()]
|
||||
except KeyError as e:
|
||||
raise ValueError(f"Invalid key: {enum_key}") from e
|
||||
except AttributeError as e:
|
||||
raise ValueError(f"Invalid key: {enum_key}") from e
|
||||
|
||||
@classmethod
|
||||
def lookup_value(cls, enum_value: Any):
|
||||
"""Lookup through value side"""
|
||||
try:
|
||||
return cls(enum_value)
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid value: {enum_value}") from e
|
||||
|
||||
@classmethod
|
||||
def from_any(cls, enum_any: Any):
|
||||
"""
|
||||
This only works in the following order
|
||||
-> class itself, as is
|
||||
-> str, assume key lookup
|
||||
-> if failed try other
|
||||
|
||||
Arguments:
|
||||
enum_any {Any} -- _description_
|
||||
|
||||
Returns:
|
||||
_type_ -- _description_
|
||||
"""
|
||||
if isinstance(enum_any, cls):
|
||||
return enum_any
|
||||
# try key first if it is string
|
||||
# if failed try value
|
||||
if isinstance(enum_any, str):
|
||||
try:
|
||||
return cls.lookup_key(enum_any)
|
||||
except (ValueError, AttributeError):
|
||||
try:
|
||||
return cls.lookup_value(enum_any)
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Could not find as key or value: {enum_any}") from e
|
||||
return cls.lookup_value(enum_any)
|
||||
|
||||
def to_value(self) -> Any:
|
||||
"""Convert to value"""
|
||||
return self.value
|
||||
|
||||
def to_lower_case(self) -> str:
|
||||
"""return lower case"""
|
||||
return self.name.lower()
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""return [Enum].NAME like it was called with .name"""
|
||||
return self.name
|
||||
@@ -12,6 +12,7 @@ from corelibs.config_handling.settings_loader_handling.settings_loader_check imp
|
||||
SCRIPT_PATH: Path = Path(__file__).resolve().parent
|
||||
ROOT_PATH: Path = SCRIPT_PATH
|
||||
CONFIG_DIR: Path = Path("config")
|
||||
LOG_DIR: Path = Path("log")
|
||||
CONFIG_FILE: str = "settings.ini"
|
||||
|
||||
|
||||
@@ -26,9 +27,8 @@ def main():
|
||||
print(f"regex {regex_c} check against {value} -> {result}")
|
||||
|
||||
# for log testing
|
||||
script_path: Path = Path(__file__).resolve().parent
|
||||
log = Log(
|
||||
log_path=script_path.joinpath('log', 'settings_loader.log'),
|
||||
log_path=ROOT_PATH.joinpath(LOG_DIR, 'settings_loader.log'),
|
||||
log_name="Settings Loader",
|
||||
log_settings={
|
||||
"log_level_console": 'DEBUG',
|
||||
|
||||
2
test-run/db_handling/database/.gitignore
vendored
Normal file
2
test-run/db_handling/database/.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
*
|
||||
!.gitignore
|
||||
2
test-run/db_handling/log/.gitignore
vendored
Normal file
2
test-run/db_handling/log/.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
*
|
||||
!.gitignore
|
||||
148
test-run/db_handling/sqlite_io.py
Normal file
148
test-run/db_handling/sqlite_io.py
Normal file
@@ -0,0 +1,148 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
Main comment
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
from uuid import uuid4
|
||||
import json
|
||||
import sqlite3
|
||||
from corelibs.debug_handling.dump_data import dump_data
|
||||
from corelibs.logging_handling.log import Log, Logger
|
||||
from corelibs.db_handling.sqlite_io import SQLiteIO
|
||||
|
||||
SCRIPT_PATH: Path = Path(__file__).resolve().parent
|
||||
ROOT_PATH: Path = SCRIPT_PATH
|
||||
DATABASE_DIR: Path = Path("database")
|
||||
LOG_DIR: Path = Path("log")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""
|
||||
Comment
|
||||
"""
|
||||
log = Log(
|
||||
log_path=ROOT_PATH.joinpath(LOG_DIR, 'sqlite_io.log'),
|
||||
log_name="SQLite IO",
|
||||
log_settings={
|
||||
"log_level_console": 'DEBUG',
|
||||
"log_level_file": 'DEBUG',
|
||||
}
|
||||
)
|
||||
db = SQLiteIO(
|
||||
log=Logger(log.get_logger_settings()),
|
||||
db_name=ROOT_PATH.joinpath(DATABASE_DIR, 'test_sqlite_io.db'),
|
||||
row_factory='Dict'
|
||||
)
|
||||
if db.db_connected():
|
||||
log.info(f"Connected to DB: {db.db_name}")
|
||||
if db.trigger_exists('trg_test_a_set_date_updated_on_update'):
|
||||
log.info("Trigger trg_test_a_set_date_updated_on_update exists")
|
||||
if db.table_exists('test_a'):
|
||||
log.info("Table test_a exists, dropping for clean test")
|
||||
db.execute_query("DROP TABLE test_a;")
|
||||
# create a dummy table
|
||||
table_sql = """
|
||||
CREATE TABLE IF NOT EXISTS test_a (
|
||||
test_a_id INTEGER PRIMARY KEY,
|
||||
date_created TEXT DEFAULT (strftime('%Y-%m-%d %H:%M:%f', 'now')),
|
||||
date_updated TEXT,
|
||||
uid TEXT NOT NULL UNIQUE,
|
||||
set_current_timestamp TEXT DEFAULT CURRENT_TIMESTAMP,
|
||||
text_a TEXT,
|
||||
content,
|
||||
int_a INTEGER,
|
||||
float_a REAL
|
||||
);
|
||||
"""
|
||||
result = db.execute_query(table_sql)
|
||||
log.debug(f"Create table result: {result}")
|
||||
trigger_sql = """
|
||||
CREATE TRIGGER trg_test_a_set_date_updated_on_update
|
||||
AFTER UPDATE ON test_a
|
||||
FOR EACH ROW
|
||||
WHEN OLD.date_updated IS NULL OR NEW.date_updated = OLD.date_updated
|
||||
BEGIN
|
||||
UPDATE test_a
|
||||
SET date_updated = (strftime('%Y-%m-%d %H:%M:%f', 'now'))
|
||||
WHERE test_a_id = NEW.test_a_id;
|
||||
END;
|
||||
"""
|
||||
result = db.execute_query(trigger_sql)
|
||||
log.debug(f"Create trigger result: {result}")
|
||||
result = db.meta_data_detail('test_a')
|
||||
log.debug(f"Table meta data detail: {dump_data(result)}")
|
||||
# INSERT DATA
|
||||
sql = """
|
||||
INSERT INTO test_a (uid, text_a, content, int_a, float_a)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
RETURNING test_a_id, uid;
|
||||
"""
|
||||
result = db.execute_query(
|
||||
sql,
|
||||
(
|
||||
str(uuid4()),
|
||||
'Some text A',
|
||||
json.dumps({'foo': 'bar', 'number': 42}),
|
||||
123,
|
||||
123.456,
|
||||
)
|
||||
)
|
||||
log.debug(f"[1] Insert data result: {dump_data(result)}")
|
||||
__uid: str = ''
|
||||
if result is not False:
|
||||
# first one only of interest
|
||||
result = dict(result[0])
|
||||
__uid = str(result.get('uid', ''))
|
||||
# second insert
|
||||
result = db.execute_query(
|
||||
sql,
|
||||
(
|
||||
str(uuid4()),
|
||||
'Some text A',
|
||||
json.dumps({'foo': 'bar', 'number': 42}),
|
||||
123,
|
||||
123.456,
|
||||
)
|
||||
)
|
||||
log.debug(f"[2] Insert data result: {dump_data(result)}")
|
||||
result = db.execute_query("SELECT * FROM test_a;")
|
||||
log.debug(f"Select data result: {dump_data(result)}")
|
||||
result = db.return_one("SELECT * FROM test_a WHERE uid = ?;", (__uid,))
|
||||
log.debug(f"Fetch row result: {dump_data(result)}")
|
||||
sql = """
|
||||
UPDATE test_a
|
||||
SET text_a = ?
|
||||
WHERE uid = ?;
|
||||
"""
|
||||
result = db.execute_query(
|
||||
sql,
|
||||
(
|
||||
'Some updated text A',
|
||||
__uid,
|
||||
)
|
||||
)
|
||||
log.debug(f"Update data result: {dump_data(result)}")
|
||||
result = db.return_one("SELECT * FROM test_a WHERE uid = ?;", (__uid,))
|
||||
log.debug(f"Fetch row after update result: {dump_data(result)}")
|
||||
|
||||
db.db_close()
|
||||
|
||||
db = SQLiteIO(
|
||||
log=Logger(log.get_logger_settings()),
|
||||
db_name=ROOT_PATH.joinpath(DATABASE_DIR, 'test_sqlite_io.db'),
|
||||
row_factory='Row'
|
||||
)
|
||||
result = db.return_one("SELECT * FROM test_a WHERE uid = ?;", (__uid,))
|
||||
if result is not None and result is not False:
|
||||
log.debug(f"Fetch row result: {dump_data(result)} -> {dict(result)} -> {result.keys()}")
|
||||
log.debug(f"Access via index: {result[5]} -> {result['text_a']}")
|
||||
if isinstance(result, sqlite3.Row):
|
||||
log.debug('Result is sqlite3.Row as expected')
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
# __END__
|
||||
29
test-run/var_handling/enum_base.py
Normal file
29
test-run/var_handling/enum_base.py
Normal file
@@ -0,0 +1,29 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
Enum handling
|
||||
"""
|
||||
|
||||
from corelibs.var_handling.enum_base import EnumBase
|
||||
|
||||
|
||||
class TestBlock(EnumBase):
|
||||
"""Test block enum"""
|
||||
BLOCK_A = "block_a"
|
||||
HAS_NUM = 5
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""
|
||||
Comment
|
||||
"""
|
||||
|
||||
print(f"BLOCK A: {TestBlock.from_any('BLOCK_A')}")
|
||||
print(f"HAS NUM: {TestBlock.from_any(5)}")
|
||||
print(f"DIRECT BLOCK: {TestBlock.BLOCK_A.name} -> {TestBlock.BLOCK_A.value}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
# __END__
|
||||
3
tests/unit/db_handling/__init__.py
Normal file
3
tests/unit/db_handling/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
"""
|
||||
db_handling tests
|
||||
"""
|
||||
1133
tests/unit/db_handling/test_sqlite_io.py
Normal file
1133
tests/unit/db_handling/test_sqlite_io.py
Normal file
File diff suppressed because it is too large
Load Diff
3
tests/unit/var_handling/__init__.py
Normal file
3
tests/unit/var_handling/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
"""
|
||||
var_handling tests
|
||||
"""
|
||||
546
tests/unit/var_handling/test_enum_base.py
Normal file
546
tests/unit/var_handling/test_enum_base.py
Normal file
@@ -0,0 +1,546 @@
|
||||
"""
|
||||
var_handling.enum_base tests
|
||||
"""
|
||||
|
||||
from typing import Any
|
||||
import pytest
|
||||
from corelibs.var_handling.enum_base import EnumBase
|
||||
|
||||
|
||||
class SampleBlock(EnumBase):
|
||||
"""Sample block enum for testing purposes"""
|
||||
BLOCK_A = "block_a"
|
||||
BLOCK_B = "block_b"
|
||||
HAS_NUM = 5
|
||||
HAS_FLOAT = 3.14
|
||||
LEGACY_KEY = "legacy_value"
|
||||
|
||||
|
||||
class SimpleEnum(EnumBase):
|
||||
"""Simple enum with string values"""
|
||||
OPTION_ONE = "one"
|
||||
OPTION_TWO = "two"
|
||||
OPTION_THREE = "three"
|
||||
|
||||
|
||||
class NumericEnum(EnumBase):
|
||||
"""Enum with only numeric values"""
|
||||
FIRST = 1
|
||||
SECOND = 2
|
||||
THIRD = 3
|
||||
|
||||
|
||||
class TestEnumBaseLookupKey:
|
||||
"""Test cases for lookup_key class method"""
|
||||
|
||||
def test_lookup_key_valid_uppercase(self):
|
||||
"""Test lookup_key with valid uppercase key"""
|
||||
result = SampleBlock.lookup_key("BLOCK_A")
|
||||
assert result == SampleBlock.BLOCK_A
|
||||
assert result.name == "BLOCK_A"
|
||||
assert result.value == "block_a"
|
||||
|
||||
def test_lookup_key_valid_lowercase(self):
|
||||
"""Test lookup_key with valid lowercase key (should convert to uppercase)"""
|
||||
result = SampleBlock.lookup_key("block_a")
|
||||
assert result == SampleBlock.BLOCK_A
|
||||
assert result.name == "BLOCK_A"
|
||||
|
||||
def test_lookup_key_valid_mixed_case(self):
|
||||
"""Test lookup_key with mixed case key"""
|
||||
result = SampleBlock.lookup_key("BlOcK_a")
|
||||
assert result == SampleBlock.BLOCK_A
|
||||
assert result.name == "BLOCK_A"
|
||||
|
||||
def test_lookup_key_with_numeric_enum(self):
|
||||
"""Test lookup_key with numeric enum member"""
|
||||
result = SampleBlock.lookup_key("HAS_NUM")
|
||||
assert result == SampleBlock.HAS_NUM
|
||||
assert result.value == 5
|
||||
|
||||
def test_lookup_key_legacy_colon_replacement(self):
|
||||
"""Test lookup_key with legacy colon format (converts : to ___)"""
|
||||
# This assumes the enum has a key that might be accessed with legacy format
|
||||
# Should convert : to ___ and look up LEGACY___KEY
|
||||
# Since we don't have this key, we test the behavior with a valid conversion
|
||||
# Let's test with a known key that would work
|
||||
with pytest.raises(ValueError, match="Invalid key"):
|
||||
SampleBlock.lookup_key("BLOCK:A") # Should fail as BLOCK___A doesn't exist
|
||||
|
||||
def test_lookup_key_invalid_key(self):
|
||||
"""Test lookup_key with invalid key"""
|
||||
with pytest.raises(ValueError, match="Invalid key: NONEXISTENT"):
|
||||
SampleBlock.lookup_key("NONEXISTENT")
|
||||
|
||||
def test_lookup_key_empty_string(self):
|
||||
"""Test lookup_key with empty string"""
|
||||
with pytest.raises(ValueError, match="Invalid key"):
|
||||
SampleBlock.lookup_key("")
|
||||
|
||||
def test_lookup_key_with_special_characters(self):
|
||||
"""Test lookup_key with special characters that might cause AttributeError"""
|
||||
with pytest.raises(ValueError, match="Invalid key"):
|
||||
SampleBlock.lookup_key("@#$%")
|
||||
|
||||
def test_lookup_key_numeric_string(self):
|
||||
"""Test lookup_key with numeric string that isn't a key"""
|
||||
with pytest.raises(ValueError, match="Invalid key"):
|
||||
SampleBlock.lookup_key("123")
|
||||
|
||||
|
||||
class TestEnumBaseLookupValue:
|
||||
"""Test cases for lookup_value class method"""
|
||||
|
||||
def test_lookup_value_valid_string(self):
|
||||
"""Test lookup_value with valid string value"""
|
||||
result = SampleBlock.lookup_value("block_a")
|
||||
assert result == SampleBlock.BLOCK_A
|
||||
assert result.name == "BLOCK_A"
|
||||
assert result.value == "block_a"
|
||||
|
||||
def test_lookup_value_valid_integer(self):
|
||||
"""Test lookup_value with valid integer value"""
|
||||
result = SampleBlock.lookup_value(5)
|
||||
assert result == SampleBlock.HAS_NUM
|
||||
assert result.name == "HAS_NUM"
|
||||
assert result.value == 5
|
||||
|
||||
def test_lookup_value_valid_float(self):
|
||||
"""Test lookup_value with valid float value"""
|
||||
result = SampleBlock.lookup_value(3.14)
|
||||
assert result == SampleBlock.HAS_FLOAT
|
||||
assert result.name == "HAS_FLOAT"
|
||||
assert result.value == 3.14
|
||||
|
||||
def test_lookup_value_invalid_string(self):
|
||||
"""Test lookup_value with invalid string value"""
|
||||
with pytest.raises(ValueError, match="Invalid value: nonexistent"):
|
||||
SampleBlock.lookup_value("nonexistent")
|
||||
|
||||
def test_lookup_value_invalid_integer(self):
|
||||
"""Test lookup_value with invalid integer value"""
|
||||
with pytest.raises(ValueError, match="Invalid value: 999"):
|
||||
SampleBlock.lookup_value(999)
|
||||
|
||||
def test_lookup_value_case_sensitive(self):
|
||||
"""Test that lookup_value is case-sensitive for string values"""
|
||||
with pytest.raises(ValueError, match="Invalid value"):
|
||||
SampleBlock.lookup_value("BLOCK_A") # Value is "block_a", not "BLOCK_A"
|
||||
|
||||
|
||||
class TestEnumBaseFromAny:
|
||||
"""Test cases for from_any class method"""
|
||||
|
||||
def test_from_any_with_enum_instance(self):
|
||||
"""Test from_any with an enum instance (should return as-is)"""
|
||||
enum_instance = SampleBlock.BLOCK_A
|
||||
result = SampleBlock.from_any(enum_instance)
|
||||
assert result is enum_instance
|
||||
assert result == SampleBlock.BLOCK_A
|
||||
|
||||
def test_from_any_with_string_as_key(self):
|
||||
"""Test from_any with string that matches a key"""
|
||||
result = SampleBlock.from_any("BLOCK_A")
|
||||
assert result == SampleBlock.BLOCK_A
|
||||
assert result.name == "BLOCK_A"
|
||||
assert result.value == "block_a"
|
||||
|
||||
def test_from_any_with_string_as_key_lowercase(self):
|
||||
"""Test from_any with lowercase string key"""
|
||||
result = SampleBlock.from_any("block_a")
|
||||
# Should first try as key (convert to uppercase and find BLOCK_A)
|
||||
assert result == SampleBlock.BLOCK_A
|
||||
|
||||
def test_from_any_with_string_as_value(self):
|
||||
"""Test from_any with string that only matches a value"""
|
||||
# Use a value that isn't also a valid key
|
||||
result = SampleBlock.from_any("block_b")
|
||||
# Should try key first (fail), then value (succeed)
|
||||
assert result == SampleBlock.BLOCK_B
|
||||
assert result.value == "block_b"
|
||||
|
||||
def test_from_any_with_integer(self):
|
||||
"""Test from_any with integer value"""
|
||||
result = SampleBlock.from_any(5)
|
||||
assert result == SampleBlock.HAS_NUM
|
||||
assert result.value == 5
|
||||
|
||||
def test_from_any_with_float(self):
|
||||
"""Test from_any with float value"""
|
||||
result = SampleBlock.from_any(3.14)
|
||||
assert result == SampleBlock.HAS_FLOAT
|
||||
assert result.value == 3.14
|
||||
|
||||
def test_from_any_with_invalid_string(self):
|
||||
"""Test from_any with string that doesn't match key or value"""
|
||||
with pytest.raises(ValueError, match="Could not find as key or value: invalid_string"):
|
||||
SampleBlock.from_any("invalid_string")
|
||||
|
||||
def test_from_any_with_invalid_integer(self):
|
||||
"""Test from_any with integer that doesn't match any value"""
|
||||
with pytest.raises(ValueError, match="Invalid value: 999"):
|
||||
SampleBlock.from_any(999)
|
||||
|
||||
def test_from_any_string_key_priority(self):
|
||||
"""Test that from_any tries key lookup before value for strings"""
|
||||
# Create an enum where a value matches another key
|
||||
class AmbiguousEnum(EnumBase):
|
||||
KEY_A = "key_b" # Value is the name of another key
|
||||
KEY_B = "value_b"
|
||||
|
||||
# When we look up "KEY_B", it should find it as a key, not as value "key_b"
|
||||
result = AmbiguousEnum.from_any("KEY_B")
|
||||
assert result == AmbiguousEnum.KEY_B
|
||||
assert result.value == "value_b"
|
||||
|
||||
|
||||
class TestEnumBaseToValue:
|
||||
"""Test cases for to_value instance method"""
|
||||
|
||||
def test_to_value_string_value(self):
|
||||
"""Test to_value with string enum value"""
|
||||
result = SampleBlock.BLOCK_A.to_value()
|
||||
assert result == "block_a"
|
||||
assert isinstance(result, str)
|
||||
|
||||
def test_to_value_integer_value(self):
|
||||
"""Test to_value with integer enum value"""
|
||||
result = SampleBlock.HAS_NUM.to_value()
|
||||
assert result == 5
|
||||
assert isinstance(result, int)
|
||||
|
||||
def test_to_value_float_value(self):
|
||||
"""Test to_value with float enum value"""
|
||||
result = SampleBlock.HAS_FLOAT.to_value()
|
||||
assert result == 3.14
|
||||
assert isinstance(result, float)
|
||||
|
||||
def test_to_value_equals_value_attribute(self):
|
||||
"""Test that to_value returns the same as .value"""
|
||||
enum_instance = SampleBlock.BLOCK_A
|
||||
assert enum_instance.to_value() == enum_instance.value
|
||||
|
||||
|
||||
class TestEnumBaseToLowerCase:
|
||||
"""Test cases for to_lower_case instance method"""
|
||||
|
||||
def test_to_lower_case_uppercase_name(self):
|
||||
"""Test to_lower_case with uppercase enum name"""
|
||||
result = SampleBlock.BLOCK_A.to_lower_case()
|
||||
assert result == "block_a"
|
||||
assert isinstance(result, str)
|
||||
|
||||
def test_to_lower_case_mixed_name(self):
|
||||
"""Test to_lower_case with name containing underscores"""
|
||||
result = SampleBlock.HAS_NUM.to_lower_case()
|
||||
assert result == "has_num"
|
||||
|
||||
def test_to_lower_case_consistency(self):
|
||||
"""Test that to_lower_case always returns lowercase"""
|
||||
for member in SampleBlock:
|
||||
result = member.to_lower_case()
|
||||
assert result == result.lower()
|
||||
assert result == member.name.lower()
|
||||
|
||||
|
||||
class TestEnumBaseStrMethod:
|
||||
"""Test cases for __str__ magic method"""
|
||||
|
||||
def test_str_returns_name(self):
|
||||
"""Test that str() returns the enum name"""
|
||||
result = str(SampleBlock.BLOCK_A)
|
||||
assert result == "BLOCK_A"
|
||||
assert result == SampleBlock.BLOCK_A.name
|
||||
|
||||
def test_str_all_members(self):
|
||||
"""Test str() for all enum members"""
|
||||
for member in SampleBlock:
|
||||
result = str(member)
|
||||
assert result == member.name
|
||||
assert isinstance(result, str)
|
||||
|
||||
def test_str_in_formatting(self):
|
||||
"""Test that str works in string formatting"""
|
||||
formatted = f"Enum: {SampleBlock.BLOCK_A}"
|
||||
assert formatted == "Enum: BLOCK_A"
|
||||
|
||||
def test_str_vs_repr(self):
|
||||
"""Test difference between str and repr"""
|
||||
enum_instance = SampleBlock.BLOCK_A
|
||||
str_result = str(enum_instance)
|
||||
repr_result = repr(enum_instance)
|
||||
|
||||
assert str_result == "BLOCK_A"
|
||||
# repr should include class name
|
||||
assert "SampleBlock" in repr_result
|
||||
|
||||
|
||||
# Parametrized tests for comprehensive coverage
|
||||
class TestParametrized:
|
||||
"""Parametrized tests for better coverage"""
|
||||
|
||||
@pytest.mark.parametrize("key,expected_member", [
|
||||
("BLOCK_A", SampleBlock.BLOCK_A),
|
||||
("block_a", SampleBlock.BLOCK_A),
|
||||
("BLOCK_B", SampleBlock.BLOCK_B),
|
||||
("HAS_NUM", SampleBlock.HAS_NUM),
|
||||
("has_num", SampleBlock.HAS_NUM),
|
||||
("HAS_FLOAT", SampleBlock.HAS_FLOAT),
|
||||
])
|
||||
def test_lookup_key_parametrized(self, key: str, expected_member: EnumBase):
|
||||
"""Test lookup_key with various valid keys"""
|
||||
result = SampleBlock.lookup_key(key)
|
||||
assert result == expected_member
|
||||
|
||||
@pytest.mark.parametrize("value,expected_member", [
|
||||
("block_a", SampleBlock.BLOCK_A),
|
||||
("block_b", SampleBlock.BLOCK_B),
|
||||
(5, SampleBlock.HAS_NUM),
|
||||
(3.14, SampleBlock.HAS_FLOAT),
|
||||
("legacy_value", SampleBlock.LEGACY_KEY),
|
||||
])
|
||||
def test_lookup_value_parametrized(self, value: Any, expected_member: EnumBase):
|
||||
"""Test lookup_value with various valid values"""
|
||||
result = SampleBlock.lookup_value(value)
|
||||
assert result == expected_member
|
||||
|
||||
@pytest.mark.parametrize("input_any,expected_member", [
|
||||
("BLOCK_A", SampleBlock.BLOCK_A),
|
||||
("block_a", SampleBlock.BLOCK_A),
|
||||
("block_b", SampleBlock.BLOCK_B),
|
||||
(5, SampleBlock.HAS_NUM),
|
||||
(3.14, SampleBlock.HAS_FLOAT),
|
||||
(SampleBlock.BLOCK_A, SampleBlock.BLOCK_A), # Pass enum instance
|
||||
])
|
||||
def test_from_any_parametrized(self, input_any: Any, expected_member: EnumBase):
|
||||
"""Test from_any with various valid inputs"""
|
||||
result = SampleBlock.from_any(input_any)
|
||||
assert result == expected_member
|
||||
|
||||
@pytest.mark.parametrize("invalid_key", [
|
||||
"NONEXISTENT",
|
||||
"invalid",
|
||||
"123",
|
||||
"",
|
||||
"BLOCK_C",
|
||||
])
|
||||
def test_lookup_key_invalid_parametrized(self, invalid_key: str):
|
||||
"""Test lookup_key with various invalid keys"""
|
||||
with pytest.raises(ValueError, match="Invalid key"):
|
||||
SampleBlock.lookup_key(invalid_key)
|
||||
|
||||
@pytest.mark.parametrize("invalid_value", [
|
||||
"nonexistent",
|
||||
999,
|
||||
-1,
|
||||
0.0,
|
||||
"BLOCK_A", # This is a key name, not a value
|
||||
])
|
||||
def test_lookup_value_invalid_parametrized(self, invalid_value: Any):
|
||||
"""Test lookup_value with various invalid values"""
|
||||
with pytest.raises(ValueError, match="Invalid value"):
|
||||
SampleBlock.lookup_value(invalid_value)
|
||||
|
||||
|
||||
# Edge cases and special scenarios
|
||||
class TestEdgeCases:
|
||||
"""Test edge cases and special scenarios"""
|
||||
|
||||
def test_enum_with_single_member(self):
|
||||
"""Test EnumBase with only one member"""
|
||||
class SingleEnum(EnumBase):
|
||||
ONLY_ONE = "single"
|
||||
|
||||
result = SingleEnum.from_any("ONLY_ONE")
|
||||
assert result == SingleEnum.ONLY_ONE
|
||||
assert result.to_value() == "single"
|
||||
|
||||
def test_enum_iteration(self):
|
||||
"""Test iterating over enum members"""
|
||||
members = list(SampleBlock)
|
||||
assert len(members) == 5
|
||||
assert SampleBlock.BLOCK_A in members
|
||||
assert SampleBlock.BLOCK_B in members
|
||||
assert SampleBlock.HAS_NUM in members
|
||||
|
||||
def test_enum_membership(self):
|
||||
"""Test checking membership in enum"""
|
||||
assert SampleBlock.BLOCK_A in SampleBlock
|
||||
assert SampleBlock.HAS_NUM in SampleBlock
|
||||
|
||||
def test_enum_comparison(self):
|
||||
"""Test comparing enum members"""
|
||||
assert SampleBlock.BLOCK_A == SampleBlock.BLOCK_A
|
||||
assert SampleBlock.BLOCK_A != SampleBlock.BLOCK_B
|
||||
assert SampleBlock.from_any("BLOCK_A") == SampleBlock.BLOCK_A
|
||||
|
||||
def test_enum_identity(self):
|
||||
"""Test enum member identity"""
|
||||
member1 = SampleBlock.BLOCK_A
|
||||
member2 = SampleBlock.lookup_key("BLOCK_A")
|
||||
member3 = SampleBlock.from_any("BLOCK_A")
|
||||
|
||||
assert member1 is member2
|
||||
assert member1 is member3
|
||||
assert member2 is member3
|
||||
|
||||
def test_different_enum_classes(self):
|
||||
"""Test that different enum classes are distinct"""
|
||||
# Even if they have same keys/values, they're different
|
||||
class OtherEnum(EnumBase):
|
||||
BLOCK_A = "block_a"
|
||||
|
||||
result1 = SampleBlock.from_any("BLOCK_A")
|
||||
result2 = OtherEnum.from_any("BLOCK_A")
|
||||
|
||||
assert result1 != result2
|
||||
assert not isinstance(result1, type(result2))
|
||||
|
||||
def test_numeric_enum_operations(self):
|
||||
"""Test operations specific to numeric enums"""
|
||||
assert NumericEnum.FIRST.to_value() == 1
|
||||
assert NumericEnum.SECOND.to_value() == 2
|
||||
assert NumericEnum.THIRD.to_value() == 3
|
||||
|
||||
# Test from_any with integers
|
||||
assert NumericEnum.from_any(1) == NumericEnum.FIRST
|
||||
assert NumericEnum.from_any(2) == NumericEnum.SECOND
|
||||
|
||||
def test_mixed_value_types_in_same_enum(self):
|
||||
"""Test enum with mixed value types"""
|
||||
# SampleBlock already has mixed types (strings, int, float)
|
||||
assert isinstance(SampleBlock.BLOCK_A.to_value(), str)
|
||||
assert isinstance(SampleBlock.HAS_NUM.to_value(), int)
|
||||
assert isinstance(SampleBlock.HAS_FLOAT.to_value(), float)
|
||||
|
||||
def test_from_any_chained_calls(self):
|
||||
"""Test that from_any can be chained (idempotent)"""
|
||||
result1 = SampleBlock.from_any("BLOCK_A")
|
||||
result2 = SampleBlock.from_any(result1)
|
||||
result3 = SampleBlock.from_any(result2)
|
||||
|
||||
assert result1 == result2 == result3
|
||||
assert result1 is result2 is result3
|
||||
|
||||
|
||||
# Integration tests
|
||||
class TestIntegration:
|
||||
"""Integration tests combining multiple methods"""
|
||||
|
||||
def test_round_trip_key_lookup(self):
|
||||
"""Test round-trip from key to enum and back"""
|
||||
original_key = "BLOCK_A"
|
||||
enum_member = SampleBlock.lookup_key(original_key)
|
||||
result_name = str(enum_member)
|
||||
|
||||
assert result_name == original_key
|
||||
|
||||
def test_round_trip_value_lookup(self):
|
||||
"""Test round-trip from value to enum and back"""
|
||||
original_value = "block_a"
|
||||
enum_member = SampleBlock.lookup_value(original_value)
|
||||
result_value = enum_member.to_value()
|
||||
|
||||
assert result_value == original_value
|
||||
|
||||
def test_from_any_workflow(self):
|
||||
"""Test realistic workflow using from_any"""
|
||||
# Simulate receiving various types of input
|
||||
inputs = [
|
||||
"BLOCK_A", # Key as string
|
||||
"block_b", # Value as string
|
||||
5, # Numeric value
|
||||
SampleBlock.HAS_FLOAT, # Already an enum
|
||||
]
|
||||
|
||||
expected = [
|
||||
SampleBlock.BLOCK_A,
|
||||
SampleBlock.BLOCK_B,
|
||||
SampleBlock.HAS_NUM,
|
||||
SampleBlock.HAS_FLOAT,
|
||||
]
|
||||
|
||||
for input_val, expected_val in zip(inputs, expected):
|
||||
result = SampleBlock.from_any(input_val)
|
||||
assert result == expected_val
|
||||
|
||||
def test_enum_in_dictionary(self):
|
||||
"""Test using enum as dictionary key"""
|
||||
enum_dict = {
|
||||
SampleBlock.BLOCK_A: "Value A",
|
||||
SampleBlock.BLOCK_B: "Value B",
|
||||
SampleBlock.HAS_NUM: "Value Num",
|
||||
}
|
||||
|
||||
assert enum_dict[SampleBlock.BLOCK_A] == "Value A"
|
||||
block_b = SampleBlock.from_any("BLOCK_B")
|
||||
assert isinstance(block_b, SampleBlock)
|
||||
assert enum_dict[block_b] == "Value B"
|
||||
|
||||
def test_enum_in_set(self):
|
||||
"""Test using enum in a set"""
|
||||
enum_set = {SampleBlock.BLOCK_A, SampleBlock.BLOCK_B, SampleBlock.BLOCK_A}
|
||||
|
||||
assert len(enum_set) == 2 # BLOCK_A should be deduplicated
|
||||
assert SampleBlock.BLOCK_A in enum_set
|
||||
assert SampleBlock.from_any("BLOCK_B") in enum_set
|
||||
|
||||
|
||||
# Real-world usage scenarios
|
||||
class TestRealWorldScenarios:
|
||||
"""Test real-world usage scenarios from enum_test.py"""
|
||||
|
||||
def test_original_enum_test_scenario(self):
|
||||
"""Test the scenario from the original enum_test.py"""
|
||||
# BLOCK A: {SampleBlock.from_any('BLOCK_A')}
|
||||
result_a = SampleBlock.from_any('BLOCK_A')
|
||||
assert result_a == SampleBlock.BLOCK_A
|
||||
assert str(result_a) == "BLOCK_A"
|
||||
|
||||
# HAS NUM: {SampleBlock.from_any(5)}
|
||||
result_num = SampleBlock.from_any(5)
|
||||
assert result_num == SampleBlock.HAS_NUM
|
||||
assert result_num.to_value() == 5
|
||||
|
||||
# DIRECT BLOCK: {SampleBlock.BLOCK_A.name} -> {SampleBlock.BLOCK_A.value}
|
||||
assert SampleBlock.BLOCK_A.name == "BLOCK_A"
|
||||
assert SampleBlock.BLOCK_A.value == "block_a"
|
||||
|
||||
def test_config_value_parsing(self):
|
||||
"""Test parsing values from configuration (common use case)"""
|
||||
# Simulate config values that might come as strings
|
||||
config_values = ["OPTION_ONE", "option_two", "OPTION_THREE"]
|
||||
|
||||
results = [SimpleEnum.from_any(val) for val in config_values]
|
||||
|
||||
assert results[0] == SimpleEnum.OPTION_ONE
|
||||
assert results[1] == SimpleEnum.OPTION_TWO
|
||||
assert results[2] == SimpleEnum.OPTION_THREE
|
||||
|
||||
def test_api_response_mapping(self):
|
||||
"""Test mapping API response values to enum"""
|
||||
# Simulate API returning numeric codes
|
||||
api_codes = [1, 2, 3]
|
||||
|
||||
results = [NumericEnum.from_any(code) for code in api_codes]
|
||||
|
||||
assert results[0] == NumericEnum.FIRST
|
||||
assert results[1] == NumericEnum.SECOND
|
||||
assert results[2] == NumericEnum.THIRD
|
||||
|
||||
def test_validation_with_error_handling(self):
|
||||
"""Test validation with proper error handling"""
|
||||
valid_input = "BLOCK_A"
|
||||
invalid_input = "INVALID"
|
||||
|
||||
# Valid input should work
|
||||
result = SampleBlock.from_any(valid_input)
|
||||
assert result == SampleBlock.BLOCK_A
|
||||
|
||||
# Invalid input should raise ValueError
|
||||
try:
|
||||
SampleBlock.from_any(invalid_input)
|
||||
assert False, "Should have raised ValueError"
|
||||
except ValueError as e:
|
||||
assert "Could not find as key or value" in str(e)
|
||||
assert "INVALID" in str(e)
|
||||
Reference in New Issue
Block a user