91 lines
3.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Базовый класс для работы с Oracle Database
"""
import asyncio
import logging
import oracledb
from typing import Optional
from config.settings import DB_CONFIG
class OracleDatabase:
"""Базовый класс для работы с Oracle Database"""
def __init__(self, user: str = None, password: str = None, dsn: str = None):
"""
Инициализация подключения к БД
Если параметры не переданы, используются настройки из конфига
"""
self._user = user or DB_CONFIG["user"]
self._password = password or DB_CONFIG["password"]
self._dsn = dsn or DB_CONFIG["dsn"]
self._pool: Optional[oracledb.ConnectionPool] = None
async def connect(self):
"""Создание пула соединений с БД"""
try:
# Oracle does not support true async I/O; use threaded connection pool
self._pool = oracledb.create_pool(
user=self._user,
password=self._password,
dsn=self._dsn,
min=1,
max=4,
increment=1,
getmode=oracledb.POOL_GETMODE_WAIT
)
logging.info("Oracle database connection pool created successfully")
except Exception as e:
logging.error(f"Failed to create Oracle database connection pool: {e}")
raise
async def close(self):
"""Закрытие пула соединений"""
if self._pool:
try:
self._pool.close()
logging.info("Oracle database connection pool closed")
except Exception as e:
logging.error(f"Error closing Oracle database connection pool: {e}")
def _execute_query(self, query_func):
"""
Базовый wrapper для выполнения синхронных запросов в асинхронном контексте
Args:
query_func: функция, которая выполняет запрос к БД
Returns:
результат выполнения функции
"""
return asyncio.to_thread(query_func)
def _get_connection(self):
"""Получение соединения из пула"""
if not self._pool:
raise RuntimeError("Database connection pool is not initialized. Call connect() first.")
return self._pool.acquire()
async def execute_query_with_logging(self, query: str, params: dict = None, operation_name: str = "query"):
"""
Выполняет запрос с логированием ошибок SQL
Args:
query: SQL запрос
params: параметры запроса
operation_name: название операции для логирования
Returns:
результат запроса или None при ошибке
"""
def _execute():
try:
with self._get_connection() as conn:
with conn.cursor() as cur:
cur.execute(query, params or {})
return cur.fetchall()
except Exception as e:
logging.error(f"SQL Error in {operation_name}:")
logging.error(f"Query: {query}")
logging.error(f"Params: {params}")
logging.error(f"Error: {e}")
raise
return await self._execute_query(_execute)