savagedb_bot/db.py

407 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# db.py
import oracledb
from typing import Optional, Tuple
# import logging
from aiogram.types import User
class OracleDatabase:
def __init__(self, user: str, password: str, dsn: str):
self._user = user
self._password = password
self._dsn = dsn
self._pool: Optional[oracledb.ConnectionPool] = None
async def connect(self):
# Oracle does not support true async I/O; use threaded connection pool
self._pool = oracledb.create_pool(
user=self._user,
password=self._password,
dsn=self._dsn,
min=1,
max=4,
increment=1,
getmode=oracledb.POOL_GETMODE_WAIT
)
async def close(self):
if self._pool:
self._pool.close()
async def fetch_user(self, user_id: int):
# Manual async wrapper since oracledb is synchronous (threaded)
def _query():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
cur.execute("SELECT id, name FROM users WHERE id = :id", {"id": user_id})
return cur.fetchone()
import asyncio
return await asyncio.to_thread(_query)
async def fetch_vin_info(self, vin: str) -> Tuple[str, str, str, int]:
# Manual async wrapper since oracledb is synchronous (threaded)
def _query():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
query = """
select 'None',
COALESCE((select value from salvagedb.m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='26'),
(select val from salvagedb.vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Make'),'UNKNOWN') make,
COALESCE((select value from salvagedb.m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='28'),
(select val from salvagedb.vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Model'),'UNKNOWN') model,
COALESCE((select value from salvagedb.m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='29'),
(select val from salvagedb.vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Model Year'),'UNKNOWN') year,
(select count(*) from salvagedb.m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin) cnt
from (select substr(:vin,1,10) svin, :vin vin from dual) s
"""
print(f"[DB DEBUG] Executing query for VIN: {vin}")
print(f"[DB DEBUG] SVIN will be: {vin[:10]}")
cur.execute(query, {"vin": vin})
result = cur.fetchone()
print(f"[DB DEBUG] Raw result from database: {result}")
if result:
make, model, year, cnt = result[1], result[2], result[3], result[4]
print(f"[DB DEBUG] Parsed values - make: '{make}', model: '{model}', year: '{year}', cnt: {cnt}")
return make, model, year, cnt
print(f"[DB DEBUG] No result found, returning defaults")
return "UNKNOWN", "UNKNOWN", "UNKNOWN", 0
import asyncio
return await asyncio.to_thread(_query)
async def count_salvage_records(self, vin: str) -> int:
"""
Подсчитывает количество записей в таблице salvagedb.salvagedb для данного VIN
"""
def _query():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM salvagedb.salvagedb WHERE vin = :vin and svin =substr(:vin,1,10)", {"vin": vin})
result = cur.fetchone()
return result[0] if result else 0
import asyncio
return await asyncio.to_thread(_query)
async def fetch_salvage_detailed_info(self, vin: str) -> list:
"""
Получает детальную информацию о поврежденияхи истории из таблицы salvagedb.salvagedb
"""
def _query():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
query = """
SELECT
odo,
odos,
dem1,
dem2,
month||'/'||year as sale_date,
JSON_VALUE(jdata, '$.RepCost') AS j_rep_cost,
JSON_VALUE(jdata, '$.Runs_Drive') AS j_runs_drive,
JSON_VALUE(jdata, '$.Locate') AS j_locate,
(select count(*) from salvagedb.salvage_images si where si.vin = s.vin and fn =1) img_count
FROM salvagedb.salvagedb s
LEFT JOIN salvagedb.addinfo i ON s.num = i.numid
WHERE vin = :vin AND svin = substr(:vin, 1, 10)
ORDER BY year DESC, month DESC
"""
cur.execute(query, {"vin": vin})
results = cur.fetchall()
# Преобразуем результаты в список словарей
detailed_records = []
for row in results:
record = {
'odo': row[0],
'odos': row[1],
'dem1': row[2],
'dem2': row[3],
'sale_date': row[4],
'j_rep_cost': row[5],
'j_runs_drive': row[6],
'j_locate': row[7],
'img_count': row[8]
}
detailed_records.append(record)
return detailed_records
import asyncio
return await asyncio.to_thread(_query)
async def fetch_photo_paths(self, vin: str) -> list:
"""
Получает список путей к фотографиям для данного VIN
"""
def _query():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
cur.execute("SELECT ipath FROM salvagedb.salvage_images WHERE fn = 1 AND vin = :vin", {"vin": vin})
results = cur.fetchall()
return [row[0] for row in results if row[0]] if results else []
import asyncio
return await asyncio.to_thread(_query)
async def fetch_detailed_vin_info(self, vin: str) -> dict:
# Manual async wrapper since oracledb is synchronous (threaded)
def _query():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
query = """
select dp.parameter_name,
dp.category_name,
d.value,
dp.endesc
from salvagedb.m_JSONS_FROM_NHTSA d
left join salvagedb.decode_params dp
on d.variableid = dp.parameter_id
where svin = substr(:vin, 1, 10)
order by dp.category_level
"""
# logging.info(f"Query: {query}")
cur.execute(query, {"vin": vin})
results = cur.fetchall()
# logging.info(f"Results: {results}")
# Organize data by categories
detailed_info = {
'basic_characteristics': {},
'engine_and_powertrain': {},
'active_safety': {},
'transmission': {},
'passive_safety': {},
'dimensions_and_construction': {},
'brake_system': {},
'lighting': {},
'additional_features': {},
'manufacturing_and_localization': {},
'ncsa_data': {},
'technical_information_and_errors': {},
'all_params': {} # Flat dictionary for easy access
}
for row in results:
param_name, category_name, value, description = row
if param_name and value:
# Create key from parameter name (lowercase, spaces to underscores)
key = param_name.lower().replace(' ', '_').replace('(', '').replace(')', '').replace('/', '_').replace('-', '_')
# Add to flat dictionary
detailed_info['all_params'][key] = value
# Add to category-specific dictionary
if category_name:
category_key = category_name.lower().replace(' ', '_').replace('(', '').replace(')', '').replace('/', '_').replace('-', '_')
if category_key in detailed_info:
detailed_info[category_key][key] = {
'value': value,
'description': description,
'param_name': param_name
}
return detailed_info
import asyncio
return await asyncio.to_thread(_query)
async def save_user(self, user: User, interaction_source: str = "bot") -> bool:
"""
Сохраняет или обновляет данные пользователя в базе данных
При первом взаимодействии создает запись, при последующих - обновляет
"""
def _save_user():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
# Проверяем, существует ли пользователь
cur.execute("SELECT id FROM bot_users WHERE id = :user_id", {"user_id": user.id})
existing_user = cur.fetchone()
if existing_user:
# Обновляем существующего пользователя
update_query = """
UPDATE bot_users SET
first_name = :first_name,
last_name = :last_name,
username = :username,
language_code = :language_code,
is_premium = :is_premium,
added_to_attachment_menu = :added_to_attachment_menu,
last_interaction_date = SYSDATE,
interaction_count = interaction_count + 1
WHERE id = :user_id
"""
params = {
"user_id": user.id,
"first_name": user.first_name,
"last_name": user.last_name,
"username": user.username,
"language_code": user.language_code,
"is_premium": 1 if user.is_premium else 0,
"added_to_attachment_menu": 1 if user.added_to_attachment_menu else 0
}
else:
# Создаем нового пользователя
insert_query = """
INSERT INTO bot_users (
id, first_name, last_name, username, language_code,
is_bot, is_premium, added_to_attachment_menu,
registration_source, first_interaction_date,
last_interaction_date, interaction_count
) VALUES (
:user_id, :first_name, :last_name, :username, :language_code,
:is_bot, :is_premium, :added_to_attachment_menu,
:registration_source, SYSDATE, SYSDATE, 1
)
"""
params = {
"user_id": user.id,
"first_name": user.first_name,
"last_name": user.last_name,
"username": user.username,
"language_code": user.language_code,
"is_bot": 1 if user.is_bot else 0,
"is_premium": 1 if user.is_premium else 0,
"added_to_attachment_menu": 1 if user.added_to_attachment_menu else 0,
"registration_source": interaction_source
}
cur.execute(insert_query, params)
conn.commit()
return True
cur.execute(update_query, params)
conn.commit()
return True
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _save_user)
except Exception as e:
print(f"Error saving user {user.id}: {e}")
return False
async def update_user_payment(self, user_id: int, payment_amount: float) -> bool:
"""
Обновляет данные о платежах пользователя
"""
def _update_payment():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
update_query = """
UPDATE bot_users SET
total_payments = total_payments + :amount,
successful_payments_count = successful_payments_count + 1,
last_interaction_date = SYSDATE
WHERE id = :user_id
"""
cur.execute(update_query, {
"user_id": user_id,
"amount": payment_amount
})
conn.commit()
return cur.rowcount > 0
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _update_payment)
except Exception as e:
print(f"Error updating payment for user {user_id}: {e}")
return False
async def get_user_stats(self, user_id: int) -> Optional[dict]:
"""
Получает статистику пользователя из базы данных
"""
def _get_stats():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
query = """
SELECT
first_name, last_name, username, language_code,
is_premium, interaction_count, total_payments,
successful_payments_count, first_interaction_date,
last_interaction_date
FROM bot_users
WHERE id = :user_id
"""
cur.execute(query, {"user_id": user_id})
result = cur.fetchone()
if result:
return {
"first_name": result[0],
"last_name": result[1],
"username": result[2],
"language_code": result[3],
"is_premium": bool(result[4]),
"interaction_count": result[5],
"total_payments": float(result[6]) if result[6] else 0.0,
"successful_payments_count": result[7],
"first_interaction_date": result[8],
"last_interaction_date": result[9]
}
return None
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _get_stats)
except Exception as e:
print(f"Error getting stats for user {user_id}: {e}")
return None
async def get_users_summary(self) -> dict:
"""
Получает общую статистику по пользователям
"""
def _get_summary():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
summary_query = """
SELECT
COUNT(*) as total_users,
COUNT(CASE WHEN is_premium = 1 THEN 1 END) as premium_users,
SUM(total_payments) as total_revenue,
SUM(successful_payments_count) as total_transactions,
COUNT(CASE WHEN last_interaction_date >= SYSDATE - 1 THEN 1 END) as active_last_24h,
COUNT(CASE WHEN last_interaction_date >= SYSDATE - 7 THEN 1 END) as active_last_week
FROM bot_users
WHERE is_active = 1
"""
cur.execute(summary_query)
result = cur.fetchone()
return {
"total_users": result[0] or 0,
"premium_users": result[1] or 0,
"total_revenue": float(result[2]) if result[2] else 0.0,
"total_transactions": result[3] or 0,
"active_last_24h": result[4] or 0,
"active_last_week": result[5] or 0
}
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _get_summary)
except Exception as e:
print(f"Error getting users summary: {e}")
return {}
async def count_photo_records(self, vin: str) -> int:
"""
Подсчитывает количество фотографий для данного VIN
"""
def _query():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM salvagedb.salvage_images WHERE vin = :vin AND fn = 1", {"vin": vin})
result = cur.fetchone()
return result[0] if result else 0
import asyncio
return await asyncio.to_thread(_query)