Add SQLite IO class
This is a very basic class without many helper functions added yet Add to the CoreLibs so when we develop it further it can be used by all projects
This commit is contained in:
0
src/corelibs/db_handling/__init__.py
Normal file
0
src/corelibs/db_handling/__init__.py
Normal file
214
src/corelibs/db_handling/sqlite_io.py
Normal file
214
src/corelibs/db_handling/sqlite_io.py
Normal file
@@ -0,0 +1,214 @@
|
||||
"""
|
||||
SQLite DB::IO
|
||||
Will be moved to the CoreLibs
|
||||
also method names are subject to change
|
||||
"""
|
||||
|
||||
# import gc
|
||||
from pathlib import Path
|
||||
from typing import Any, Literal, TYPE_CHECKING
|
||||
import sqlite3
|
||||
from corelibs.debug_handling.debug_helpers import call_stack
|
||||
if TYPE_CHECKING:
|
||||
from corelibs.logging_handling.log import Logger
|
||||
|
||||
|
||||
class SQLiteIO():
|
||||
"""Mini SQLite interface"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
log: 'Logger',
|
||||
db_name: str | Path,
|
||||
autocommit: bool = False,
|
||||
enable_fkey: bool = True,
|
||||
row_factory: str | None = None
|
||||
):
|
||||
self.log = log
|
||||
self.db_name = db_name
|
||||
self.autocommit = autocommit
|
||||
self.enable_fkey = enable_fkey
|
||||
self.row_factory = row_factory
|
||||
self.conn: sqlite3.Connection | None = self.db_connect()
|
||||
|
||||
# def __del__(self):
|
||||
# self.db_close()
|
||||
|
||||
def db_connect(self) -> sqlite3.Connection | None:
|
||||
"""
|
||||
Connect to SQLite database, create if it doesn't exist
|
||||
"""
|
||||
try:
|
||||
# Connect to database (creates if doesn't exist)
|
||||
self.conn = sqlite3.connect(self.db_name, autocommit=self.autocommit)
|
||||
self.conn.setconfig(sqlite3.SQLITE_DBCONFIG_ENABLE_FKEY, True)
|
||||
# self.conn.execute("PRAGMA journal_mode=WAL")
|
||||
# self.log.debug(f"Connected to database: {self.db_name}")
|
||||
|
||||
def dict_factory(cursor: sqlite3.Cursor, row: list[Any]):
|
||||
fields = [column[0] for column in cursor.description]
|
||||
return dict(zip(fields, row))
|
||||
|
||||
match self.row_factory:
|
||||
case 'Row':
|
||||
self.conn.row_factory = sqlite3.Row
|
||||
case 'Dict':
|
||||
self.conn.row_factory = dict_factory
|
||||
case _:
|
||||
self.conn.row_factory = None
|
||||
|
||||
return self.conn
|
||||
except (sqlite3.Error, sqlite3.OperationalError) as e:
|
||||
self.log.error(f"Error connecting to database [{type(e).__name__}] [{self.db_name}]: {e} [{call_stack()}]")
|
||||
self.log.error(f"Error code: {e.sqlite_errorcode if hasattr(e, 'sqlite_errorcode') else 'N/A'}")
|
||||
self.log.error(f"Error name: {e.sqlite_errorname if hasattr(e, 'sqlite_errorname') else 'N/A'}")
|
||||
return None
|
||||
|
||||
def db_close(self):
|
||||
"""close connection"""
|
||||
if self.conn is not None:
|
||||
self.conn.close()
|
||||
self.conn = None
|
||||
|
||||
def db_connected(self) -> bool:
|
||||
"""
|
||||
Return True if db connection is not none
|
||||
|
||||
Returns:
|
||||
bool -- _description_
|
||||
"""
|
||||
return True if self.conn else False
|
||||
|
||||
def __content_exists(self, content_name: str, sql_type: str) -> bool:
|
||||
"""
|
||||
Check if some content name for a certain type exists
|
||||
|
||||
Arguments:
|
||||
content_name {str} -- _description_
|
||||
sql_type {str} -- _description_
|
||||
|
||||
Returns:
|
||||
bool -- _description_
|
||||
"""
|
||||
if self.conn is None:
|
||||
return False
|
||||
try:
|
||||
cursor = self.conn.cursor()
|
||||
cursor.execute("""
|
||||
SELECT name
|
||||
FROM sqlite_master
|
||||
WHERE type = ? AND name = ?
|
||||
""", (sql_type, content_name,))
|
||||
return cursor.fetchone() is not None
|
||||
except sqlite3.Error as e:
|
||||
self.log.error(f"Error checking table [{content_name}/{sql_type}] existence: {e} [{call_stack()}]")
|
||||
return False
|
||||
|
||||
def table_exists(self, table_name: str) -> bool:
|
||||
"""
|
||||
Check if a table exists in the database
|
||||
"""
|
||||
return self.__content_exists(table_name, 'table')
|
||||
|
||||
def trigger_exists(self, trigger_name: str) -> bool:
|
||||
"""
|
||||
Check if a triggere exits
|
||||
"""
|
||||
return self.__content_exists(trigger_name, 'trigger')
|
||||
|
||||
def index_exists(self, index_name: str) -> bool:
|
||||
"""
|
||||
Check if a triggere exits
|
||||
"""
|
||||
return self.__content_exists(index_name, 'index')
|
||||
|
||||
def meta_data_detail(self, table_name: str) -> list[tuple[Any, ...]] | list[dict[str, Any]] | Literal[False]:
|
||||
"""table detail"""
|
||||
query_show_table = """
|
||||
SELECT
|
||||
ti.cid, ti.name, ti.type, ti.'notnull', ti.dflt_value, ti.pk,
|
||||
il_ii.idx_name, il_ii.idx_unique, il_ii.idx_origin, il_ii.idx_partial
|
||||
FROM
|
||||
sqlite_schema AS m,
|
||||
pragma_table_info(m.name) AS ti
|
||||
LEFT JOIN (
|
||||
SELECT
|
||||
il.name AS idx_name, il.'unique' AS idx_unique, il.origin AS idx_origin, il.partial AS idx_partial,
|
||||
ii.cid AS tbl_cid
|
||||
FROM
|
||||
sqlite_schema AS m,
|
||||
pragma_index_list(m.name) AS il,
|
||||
pragma_index_info(il.name) AS ii
|
||||
WHERE m.name = ?1
|
||||
) AS il_ii ON (ti.cid = il_ii.tbl_cid)
|
||||
WHERE
|
||||
m.name = ?1
|
||||
"""
|
||||
return self.execute_query(query_show_table, (table_name,))
|
||||
|
||||
def execute_cursor(
|
||||
self, query: str, params: tuple[Any, ...] | None = None
|
||||
) -> sqlite3.Cursor | Literal[False]:
|
||||
"""execute a cursor, used in execute query or return one and for fetch_row"""
|
||||
if self.conn is None:
|
||||
self.log.warning(f"No connection [{call_stack()}]")
|
||||
return False
|
||||
try:
|
||||
cursor = self.conn.cursor()
|
||||
if params:
|
||||
cursor.execute(query, params)
|
||||
else:
|
||||
cursor.execute(query)
|
||||
return cursor
|
||||
except sqlite3.Error as e:
|
||||
self.log.error(f"Error during executing cursor [{query}:{params}]: {e} [{call_stack()}]")
|
||||
return False
|
||||
|
||||
def execute_query(
|
||||
self, query: str, params: tuple[Any, ...] | None = None
|
||||
) -> list[tuple[Any, ...]] | list[dict[str, Any]] | Literal[False]:
|
||||
"""query execute with or without params, returns result"""
|
||||
if self.conn is None:
|
||||
self.log.warning(f"No connection [{call_stack()}]")
|
||||
return False
|
||||
try:
|
||||
if (cursor := self.execute_cursor(query, params)) is False:
|
||||
return False
|
||||
# fetch before commit because we need to get the RETURN before
|
||||
result = cursor.fetchall()
|
||||
# this is for INSERT/UPDATE/CREATE only
|
||||
self.conn.commit()
|
||||
return result
|
||||
except sqlite3.Error as e:
|
||||
self.log.error(f"Error during executing query [{query}:{params}]: {e} [{call_stack()}]")
|
||||
return False
|
||||
|
||||
def return_one(
|
||||
self, query: str, params: tuple[Any, ...] | None = None
|
||||
) -> tuple[Any, ...] | dict[str, Any] | Literal[False] | None:
|
||||
"""return one row, only for SELECT"""
|
||||
if self.conn is None:
|
||||
self.log.warning(f"No connection [{call_stack()}]")
|
||||
return False
|
||||
try:
|
||||
if (cursor := self.execute_cursor(query, params)) is False:
|
||||
return False
|
||||
return cursor.fetchone()
|
||||
except sqlite3.Error as e:
|
||||
self.log.error(f"Error during return one: {e} [{call_stack()}]")
|
||||
return False
|
||||
|
||||
def fetch_row(
|
||||
self, cursor: sqlite3.Cursor | Literal[False]
|
||||
) -> tuple[Any, ...] | dict[str, Any] | Literal[False] | None:
|
||||
"""read from cursor"""
|
||||
if self.conn is None or cursor is False:
|
||||
self.log.warning(f"No connection [{call_stack()}]")
|
||||
return False
|
||||
try:
|
||||
return cursor.fetchone()
|
||||
except sqlite3.Error as e:
|
||||
self.log.error(f"Error during fetch row: {e} [{call_stack()}]")
|
||||
return False
|
||||
|
||||
# __END__
|
||||
Reference in New Issue
Block a user