savagedb_bot/db.py

1277 lines
65 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# db.py
import oracledb
from typing import Optional, Tuple
# import logging
from aiogram.types import User
class OracleDatabase:
def __init__(self, user: str, password: str, dsn: str):
self._user = user
self._password = password
self._dsn = dsn
self._pool: Optional[oracledb.ConnectionPool] = None
async def connect(self):
# Oracle does not support true async I/O; use threaded connection pool
self._pool = oracledb.create_pool(
user=self._user,
password=self._password,
dsn=self._dsn,
min=1,
max=4,
increment=1,
getmode=oracledb.POOL_GETMODE_WAIT
)
async def close(self):
if self._pool:
self._pool.close()
async def fetch_user(self, user_id: int):
# Manual async wrapper since oracledb is synchronous (threaded)
def _query():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
cur.execute("SELECT id, name FROM users WHERE id = :id", {"id": user_id})
return cur.fetchone()
import asyncio
return await asyncio.to_thread(_query)
async def fetch_vin_info(self, vin: str) -> Tuple[str, str, str, int]:
# Manual async wrapper since oracledb is synchronous (threaded)
def _query():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
query = """
select 'None',
COALESCE((select value from salvagedb.m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='26'),
(select val from salvagedb.vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Make'),'UNKNOWN') make,
COALESCE((select value from salvagedb.m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='28'),
(select val from salvagedb.vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Model'),'UNKNOWN') model,
COALESCE((select value from salvagedb.m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='29'),
(select val from salvagedb.vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Model Year'),'UNKNOWN') year,
(select count(*) from salvagedb.m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin) cnt
from (select substr(:vin,1,10) svin, :vin vin from dual) s
"""
print(f"[DB DEBUG] Executing query for VIN: {vin}")
print(f"[DB DEBUG] SVIN will be: {vin[:10]}")
cur.execute(query, {"vin": vin})
result = cur.fetchone()
print(f"[DB DEBUG] Raw result from database: {result}")
if result:
make, model, year, cnt = result[1], result[2], result[3], result[4]
print(f"[DB DEBUG] Parsed values - make: '{make}', model: '{model}', year: '{year}', cnt: {cnt}")
return make, model, year, cnt
print(f"[DB DEBUG] No result found, returning defaults")
return "UNKNOWN", "UNKNOWN", "UNKNOWN", 0
import asyncio
return await asyncio.to_thread(_query)
async def count_salvage_records(self, vin: str) -> int:
"""
Подсчитывает количество записей в таблице salvagedb.salvagedb для данного VIN
"""
def _query():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM salvagedb.salvagedb WHERE vin = :vin and svin =substr(:vin,1,10)", {"vin": vin})
result = cur.fetchone()
return result[0] if result else 0
import asyncio
return await asyncio.to_thread(_query)
async def fetch_salvage_detailed_info(self, vin: str) -> list:
"""
Получает детальную информацию о поврежденияхи истории из таблицы salvagedb.salvagedb
"""
def _query():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
query = """
SELECT
odo,
odos,
dem1,
dem2,
month||'/'||year as sale_date,
JSON_VALUE(jdata, '$.RepCost') AS j_rep_cost,
JSON_VALUE(jdata, '$.Runs_Drive') AS j_runs_drive,
JSON_VALUE(jdata, '$.Locate') AS j_locate,
(select count(*) from salvagedb.salvage_images si where si.vin = s.vin and fn =1) img_count
FROM salvagedb.salvagedb s
LEFT JOIN salvagedb.addinfo i ON s.num = i.numid
WHERE vin = :vin AND svin = substr(:vin, 1, 10)
ORDER BY year DESC, month DESC
"""
cur.execute(query, {"vin": vin})
results = cur.fetchall()
# Преобразуем результаты в список словарей
detailed_records = []
for row in results:
record = {
'odo': row[0],
'odos': row[1],
'dem1': row[2],
'dem2': row[3],
'sale_date': row[4],
'j_rep_cost': row[5],
'j_runs_drive': row[6],
'j_locate': row[7],
'img_count': row[8]
}
detailed_records.append(record)
return detailed_records
import asyncio
return await asyncio.to_thread(_query)
async def fetch_photo_paths(self, vin: str) -> list:
"""
Получает список путей к фотографиям для данного VIN
"""
def _query():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
cur.execute("SELECT ipath FROM salvagedb.salvage_images WHERE fn = 1 AND vin = :vin", {"vin": vin})
results = cur.fetchall()
return [row[0] for row in results if row[0]] if results else []
import asyncio
return await asyncio.to_thread(_query)
async def fetch_detailed_vin_info(self, vin: str) -> dict:
# Manual async wrapper since oracledb is synchronous (threaded)
def _query():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
query = """
select dp.parameter_name,
dp.category_name,
d.value,
dp.endesc
from salvagedb.m_JSONS_FROM_NHTSA d
left join salvagedb.decode_params dp
on d.variableid = dp.parameter_id
where svin = substr(:vin, 1, 10)
order by dp.category_level
"""
# logging.info(f"Query: {query}")
cur.execute(query, {"vin": vin})
results = cur.fetchall()
# logging.info(f"Results: {results}")
# Organize data by categories
detailed_info = {
'basic_characteristics': {},
'engine_and_powertrain': {},
'active_safety': {},
'transmission': {},
'passive_safety': {},
'dimensions_and_construction': {},
'brake_system': {},
'lighting': {},
'additional_features': {},
'manufacturing_and_localization': {},
'ncsa_data': {},
'technical_information_and_errors': {},
'all_params': {} # Flat dictionary for easy access
}
for row in results:
param_name, category_name, value, description = row
if param_name and value:
# Create key from parameter name (lowercase, spaces to underscores)
key = param_name.lower().replace(' ', '_').replace('(', '').replace(')', '').replace('/', '_').replace('-', '_')
# Add to flat dictionary
detailed_info['all_params'][key] = value
# Add to category-specific dictionary
if category_name:
category_key = category_name.lower().replace(' ', '_').replace('(', '').replace(')', '').replace('/', '_').replace('-', '_')
if category_key in detailed_info:
detailed_info[category_key][key] = {
'value': value,
'description': description,
'param_name': param_name
}
return detailed_info
import asyncio
return await asyncio.to_thread(_query)
async def save_user(self, user: User, interaction_source: str = "bot") -> bool:
"""
Сохраняет или обновляет данные пользователя в базе данных
При первом взаимодействии создает запись, при последующих - обновляет
"""
def _save_user():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
# Проверяем, существует ли пользователь
cur.execute("SELECT id FROM bot_users WHERE id = :user_id", {"user_id": user.id})
existing_user = cur.fetchone()
if existing_user:
# Обновляем существующего пользователя
update_query = """
UPDATE bot_users SET
first_name = :first_name,
last_name = :last_name,
username = :username,
language_code = :language_code,
is_premium = :is_premium,
added_to_attachment_menu = :added_to_attachment_menu,
last_interaction_date = SYSDATE,
interaction_count = interaction_count + 1
WHERE id = :user_id
"""
params = {
"user_id": user.id,
"first_name": user.first_name,
"last_name": user.last_name,
"username": user.username,
"language_code": user.language_code,
"is_premium": 1 if user.is_premium else 0,
"added_to_attachment_menu": 1 if user.added_to_attachment_menu else 0
}
else:
# Создаем нового пользователя
insert_query = """
INSERT INTO bot_users (
id, first_name, last_name, username, language_code,
is_bot, is_premium, added_to_attachment_menu,
registration_source, first_interaction_date,
last_interaction_date, interaction_count
) VALUES (
:user_id, :first_name, :last_name, :username, :language_code,
:is_bot, :is_premium, :added_to_attachment_menu,
:registration_source, SYSDATE, SYSDATE, 1
)
"""
params = {
"user_id": user.id,
"first_name": user.first_name,
"last_name": user.last_name,
"username": user.username,
"language_code": user.language_code,
"is_bot": 1 if user.is_bot else 0,
"is_premium": 1 if user.is_premium else 0,
"added_to_attachment_menu": 1 if user.added_to_attachment_menu else 0,
"registration_source": interaction_source
}
cur.execute(insert_query, params)
conn.commit()
return True
cur.execute(update_query, params)
conn.commit()
return True
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _save_user)
except Exception as e:
print(f"Error saving user {user.id}: {e}")
return False
async def update_user_payment(self, user_id: int, payment_amount: float) -> bool:
"""
Обновляет данные о платежах пользователя
"""
def _update_payment():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
update_query = """
UPDATE bot_users SET
total_payments = total_payments + :amount,
successful_payments_count = successful_payments_count + 1,
last_interaction_date = SYSDATE
WHERE id = :user_id
"""
cur.execute(update_query, {
"user_id": user_id,
"amount": payment_amount
})
conn.commit()
return cur.rowcount > 0
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _update_payment)
except Exception as e:
print(f"Error updating payment for user {user_id}: {e}")
return False
async def get_user_stats(self, user_id: int) -> Optional[dict]:
"""
Получает статистику пользователя из базы данных
"""
def _get_stats():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
query = """
SELECT
first_name, last_name, username, language_code,
is_premium, interaction_count, total_payments,
successful_payments_count, first_interaction_date,
last_interaction_date
FROM bot_users
WHERE id = :user_id
"""
cur.execute(query, {"user_id": user_id})
result = cur.fetchone()
if result:
return {
"first_name": result[0],
"last_name": result[1],
"username": result[2],
"language_code": result[3],
"is_premium": bool(result[4]),
"interaction_count": result[5],
"total_payments": float(result[6]) if result[6] else 0.0,
"successful_payments_count": result[7],
"first_interaction_date": result[8],
"last_interaction_date": result[9]
}
return None
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _get_stats)
except Exception as e:
print(f"Error getting stats for user {user_id}: {e}")
return None
async def get_users_summary(self) -> dict:
"""
Получает общую статистику по пользователям
"""
def _get_summary():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
summary_query = """
SELECT
COUNT(*) as total_users,
COUNT(CASE WHEN is_premium = 1 THEN 1 END) as premium_users,
SUM(total_payments) as total_revenue,
SUM(successful_payments_count) as total_transactions,
COUNT(CASE WHEN last_interaction_date >= SYSDATE - 1 THEN 1 END) as active_last_24h,
COUNT(CASE WHEN last_interaction_date >= SYSDATE - 7 THEN 1 END) as active_last_week
FROM bot_users
WHERE is_active = 1
"""
cur.execute(summary_query)
result = cur.fetchone()
return {
"total_users": result[0] or 0,
"premium_users": result[1] or 0,
"total_revenue": float(result[2]) if result[2] else 0.0,
"total_transactions": result[3] or 0,
"active_last_24h": result[4] or 0,
"active_last_week": result[5] or 0
}
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _get_summary)
except Exception as e:
print(f"Error getting users summary: {e}")
return {}
async def count_photo_records(self, vin: str) -> int:
"""
Подсчитывает количество фотографий для данного VIN
"""
def _query():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM salvagedb.salvage_images WHERE vin = :vin AND fn = 1", {"vin": vin})
result = cur.fetchone()
return result[0] if result else 0
import asyncio
return await asyncio.to_thread(_query)
async def save_payment_log(self, user: User, service_type: str, vin: str, payment_data: dict, service_result: dict = None) -> bool:
"""
Логирует операцию оплаты с полной информацией о пользователе, услуге и результате
Args:
user: Объект пользователя Telegram
service_type: Тип услуги ('decode_vin', 'check_salvage', 'get_photos')
vin: VIN номер автомобиля
payment_data: Данные о платеже (сумма, transaction_id, статус)
service_result: Результат предоставления услуги (количество данных, статус)
"""
def _save_log():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
insert_query = """
INSERT INTO payment_logs (
log_id, user_id, user_first_name, user_last_name, user_username,
user_language_code, user_is_premium, service_type, vin_number,
payment_amount, transaction_id, payment_status, payment_currency,
service_status, data_found_count, refund_status, refund_reason,
vehicle_make, vehicle_model, vehicle_year, error_message,
created_date, ip_address
) VALUES (
payment_logs_seq.NEXTVAL, :user_id, :user_first_name, :user_last_name, :user_username,
:user_language_code, :user_is_premium, :service_type, :vin_number,
:payment_amount, :transaction_id, :payment_status, :payment_currency,
:service_status, :data_found_count, :refund_status, :refund_reason,
:vehicle_make, :vehicle_model, :vehicle_year, :error_message,
SYSDATE, :ip_address
)
"""
params = {
"user_id": user.id,
"user_first_name": user.first_name,
"user_last_name": user.last_name,
"user_username": user.username,
"user_language_code": user.language_code,
"user_is_premium": 1 if user.is_premium else 0,
"service_type": service_type,
"vin_number": vin,
"payment_amount": payment_data.get('amount', 0),
"transaction_id": payment_data.get('transaction_id'),
"payment_status": payment_data.get('status', 'completed'),
"payment_currency": payment_data.get('currency', 'XTR'),
"service_status": service_result.get('status', 'success') if service_result else 'pending',
"data_found_count": service_result.get('data_count', 0) if service_result else 0,
"refund_status": payment_data.get('refund_status', 'no_refund'),
"refund_reason": payment_data.get('refund_reason'),
"vehicle_make": service_result.get('vehicle_make') if service_result else None,
"vehicle_model": service_result.get('vehicle_model') if service_result else None,
"vehicle_year": service_result.get('vehicle_year') if service_result else None,
"error_message": service_result.get('error') if service_result else None,
"ip_address": None # Telegram не предоставляет IP адреса
}
cur.execute(insert_query, params)
conn.commit()
return True
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _save_log)
except Exception as e:
print(f"Error saving payment log for user {user.id}: {e}")
return False
# ==============================================
# USER ANALYTICS METHODS
# ==============================================
async def get_users_general_stats(self) -> dict:
"""Общая статистика пользователей"""
def _get_stats():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
query = """
SELECT
COUNT(*) as total_users,
COUNT(CASE WHEN is_premium = 1 THEN 1 END) as premium_users,
COUNT(CASE WHEN is_active = 1 THEN 1 END) as active_users,
COUNT(CASE WHEN is_blocked = 1 THEN 1 END) as blocked_users,
COUNT(CASE WHEN successful_payments_count > 0 THEN 1 END) as paying_users,
SUM(total_payments) as total_revenue,
SUM(successful_payments_count) as total_transactions,
SUM(interaction_count) as total_interactions,
AVG(interaction_count) as avg_interactions_per_user,
COUNT(CASE WHEN last_interaction_date >= SYSDATE - 1 THEN 1 END) as active_24h,
COUNT(CASE WHEN last_interaction_date >= SYSDATE - 7 THEN 1 END) as active_7d,
COUNT(CASE WHEN last_interaction_date >= SYSDATE - 30 THEN 1 END) as active_30d
FROM bot_users
"""
cur.execute(query)
result = cur.fetchone()
return {
"total_users": result[0] or 0,
"premium_users": result[1] or 0,
"active_users": result[2] or 0,
"blocked_users": result[3] or 0,
"paying_users": result[4] or 0,
"total_revenue": float(result[5]) if result[5] else 0.0,
"total_transactions": result[6] or 0,
"total_interactions": result[7] or 0,
"avg_interactions_per_user": round(float(result[8]), 2) if result[8] else 0.0,
"active_24h": result[9] or 0,
"active_7d": result[10] or 0,
"active_30d": result[11] or 0
}
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _get_stats)
except Exception as e:
print(f"Error getting general user stats: {e}")
return {}
async def get_users_growth_stats(self) -> dict:
"""Статистика роста пользователей"""
def _get_stats():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
query = """
SELECT
COUNT(CASE WHEN created_at >= SYSDATE - 1 THEN 1 END) as new_today,
COUNT(CASE WHEN created_at >= SYSDATE - 7 THEN 1 END) as new_week,
COUNT(CASE WHEN created_at >= SYSDATE - 30 THEN 1 END) as new_month,
COUNT(CASE WHEN created_at >= SYSDATE - 365 THEN 1 END) as new_year,
TO_CHAR(MIN(created_at), 'DD.MM.YYYY') as first_user_date,
ROUND((SYSDATE - MIN(created_at))) as days_since_start
FROM bot_users
"""
cur.execute(query)
result = cur.fetchone()
# Получаем рост по дням за последние 30 дней
daily_query = """
SELECT
TO_CHAR(created_at, 'DD.MM') as day,
COUNT(*) as count
FROM bot_users
WHERE created_at >= SYSDATE - 30
GROUP BY TO_CHAR(created_at, 'DD.MM'), TRUNC(created_at)
ORDER BY TRUNC(created_at) DESC
"""
cur.execute(daily_query)
daily_growth = cur.fetchall()
return {
"new_today": result[0] or 0,
"new_week": result[1] or 0,
"new_month": result[2] or 0,
"new_year": result[3] or 0,
"first_user_date": result[4] or "N/A",
"days_since_start": result[5] or 0,
"daily_growth": daily_growth[:10] if daily_growth else []
}
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _get_stats)
except Exception as e:
print(f"Error getting growth stats: {e}")
return {}
async def get_users_premium_stats(self) -> dict:
"""Анализ Premium пользователей"""
def _get_stats():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
query = """
SELECT
COUNT(CASE WHEN is_premium = 1 THEN 1 END) as premium_users,
COUNT(CASE WHEN is_premium = 0 THEN 1 END) as regular_users,
AVG(CASE WHEN is_premium = 1 THEN total_payments END) as premium_avg_payment,
AVG(CASE WHEN is_premium = 0 THEN total_payments END) as regular_avg_payment,
AVG(CASE WHEN is_premium = 1 THEN interaction_count END) as premium_avg_interactions,
AVG(CASE WHEN is_premium = 0 THEN interaction_count END) as regular_avg_interactions,
COUNT(CASE WHEN is_premium = 1 AND successful_payments_count > 0 THEN 1 END) as premium_paying,
COUNT(CASE WHEN is_premium = 0 AND successful_payments_count > 0 THEN 1 END) as regular_paying
FROM bot_users
WHERE is_active = 1
"""
cur.execute(query)
result = cur.fetchone()
premium_total = result[0] or 0
regular_total = result[1] or 0
return {
"premium_users": premium_total,
"regular_users": regular_total,
"premium_percentage": round((premium_total / (premium_total + regular_total) * 100), 2) if (premium_total + regular_total) > 0 else 0,
"premium_avg_payment": round(float(result[2]), 2) if result[2] else 0.0,
"regular_avg_payment": round(float(result[3]), 2) if result[3] else 0.0,
"premium_avg_interactions": round(float(result[4]), 2) if result[4] else 0.0,
"regular_avg_interactions": round(float(result[5]), 2) if result[5] else 0.0,
"premium_paying": result[6] or 0,
"regular_paying": result[7] or 0,
"premium_conversion": round((result[6] / premium_total * 100), 2) if premium_total > 0 else 0,
"regular_conversion": round((result[7] / regular_total * 100), 2) if regular_total > 0 else 0
}
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _get_stats)
except Exception as e:
print(f"Error getting premium stats: {e}")
return {}
async def get_users_geography_stats(self) -> dict:
"""География пользователей"""
def _get_stats():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
# Топ языков
lang_query = """
SELECT
COALESCE(language_code, 'unknown') as language,
COUNT(*) as count,
ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM bot_users), 2) as percentage
FROM bot_users
GROUP BY language_code
ORDER BY COUNT(*) DESC
"""
cur.execute(lang_query)
languages = cur.fetchall()
# Статистика источников регистрации
source_query = """
SELECT
registration_source,
COUNT(*) as count
FROM bot_users
GROUP BY registration_source
ORDER BY COUNT(*) DESC
"""
cur.execute(source_query)
sources = cur.fetchall()
return {
"top_languages": languages[:10] if languages else [],
"total_languages": len(languages) if languages else 0,
"registration_sources": sources if sources else []
}
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _get_stats)
except Exception as e:
print(f"Error getting geography stats: {e}")
return {}
async def get_users_activity_stats(self) -> dict:
"""Анализ активности пользователей"""
def _get_stats():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
# Активность за разные периоды
activity_query = """
SELECT
COUNT(CASE WHEN last_interaction_date >= SYSDATE - 1 THEN 1 END) as active_1d,
COUNT(CASE WHEN last_interaction_date >= SYSDATE - 3 THEN 1 END) as active_3d,
COUNT(CASE WHEN last_interaction_date >= SYSDATE - 7 THEN 1 END) as active_7d,
COUNT(CASE WHEN last_interaction_date >= SYSDATE - 14 THEN 1 END) as active_14d,
COUNT(CASE WHEN last_interaction_date >= SYSDATE - 30 THEN 1 END) as active_30d,
COUNT(CASE WHEN last_interaction_date < SYSDATE - 30 THEN 1 END) as inactive_30d
FROM bot_users
WHERE is_active = 1
"""
cur.execute(activity_query)
activity = cur.fetchone()
# Распределение по количеству взаимодействий
interaction_query = """
SELECT
CASE
WHEN interaction_count = 1 THEN 'Новички (1)'
WHEN interaction_count BETWEEN 2 AND 5 THEN 'Начинающие (2-5)'
WHEN interaction_count BETWEEN 6 AND 20 THEN 'Активные (6-20)'
WHEN interaction_count BETWEEN 21 AND 100 THEN 'Постоянные (21-100)'
ELSE 'Суперактивные (100+)'
END as category,
COUNT(*) as count
FROM bot_users
WHERE is_active = 1
GROUP BY
CASE
WHEN interaction_count = 1 THEN 'Новички (1)'
WHEN interaction_count BETWEEN 2 AND 5 THEN 'Начинающие (2-5)'
WHEN interaction_count BETWEEN 6 AND 20 THEN 'Активные (6-20)'
WHEN interaction_count BETWEEN 21 AND 100 THEN 'Постоянные (21-100)'
ELSE 'Суперактивные (100+)'
END
ORDER BY COUNT(*) DESC
"""
cur.execute(interaction_query)
interactions = cur.fetchall()
return {
"active_1d": activity[0] or 0,
"active_3d": activity[1] or 0,
"active_7d": activity[2] or 0,
"active_14d": activity[3] or 0,
"active_30d": activity[4] or 0,
"inactive_30d": activity[5] or 0,
"interaction_distribution": interactions if interactions else []
}
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _get_stats)
except Exception as e:
print(f"Error getting activity stats: {e}")
return {}
async def get_users_sources_stats(self) -> dict:
"""Анализ источников пользователей"""
def _get_stats():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
# Статистика по источникам
sources_query = """
SELECT
registration_source,
COUNT(*) as total_users,
COUNT(CASE WHEN successful_payments_count > 0 THEN 1 END) as paying_users,
AVG(total_payments) as avg_revenue,
AVG(interaction_count) as avg_interactions,
COUNT(CASE WHEN created_at >= SYSDATE - 30 THEN 1 END) as new_last_30d
FROM bot_users
GROUP BY registration_source
ORDER BY COUNT(*) DESC
"""
cur.execute(sources_query)
sources = cur.fetchall()
# Топ реферальные источники (если есть)
referral_query = """
SELECT
registration_source,
COUNT(*) as count
FROM bot_users
WHERE registration_source NOT IN ('bot', 'direct', 'unknown')
GROUP BY registration_source
ORDER BY COUNT(*) DESC
FETCH FIRST 10 ROWS ONLY
"""
cur.execute(referral_query)
referrals = cur.fetchall()
return {
"source_breakdown": sources if sources else [],
"top_referrals": referrals if referrals else []
}
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _get_stats)
except Exception as e:
print(f"Error getting sources stats: {e}")
return {}
# ==============================================
# FINANCIAL ANALYTICS METHODS
# ==============================================
async def get_finance_revenue_stats(self, admin_user_id: int = 0) -> dict:
"""Анализ доходов"""
def _get_stats():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
# Общая статистика доходов (исключаем админа)
revenue_query = """
SELECT
SUM(payment_amount) as total_revenue,
COUNT(*) as total_transactions,
AVG(payment_amount) as avg_transaction,
COUNT(DISTINCT user_id) as unique_customers,
SUM(CASE WHEN created_date >= SYSDATE - 1 THEN payment_amount ELSE 0 END) as revenue_24h,
SUM(CASE WHEN created_date >= SYSDATE - 7 THEN payment_amount ELSE 0 END) as revenue_7d,
SUM(CASE WHEN created_date >= SYSDATE - 30 THEN payment_amount ELSE 0 END) as revenue_30d,
COUNT(CASE WHEN created_date >= SYSDATE - 1 THEN 1 END) as transactions_24h,
COUNT(CASE WHEN created_date >= SYSDATE - 7 THEN 1 END) as transactions_7d,
COUNT(CASE WHEN created_date >= SYSDATE - 30 THEN 1 END) as transactions_30d
FROM payment_logs
WHERE payment_status = 'completed' AND user_id != :admin_user_id
"""
cur.execute(revenue_query, {"admin_user_id": admin_user_id})
revenue = cur.fetchone()
# Доходы по дням за последние 10 дней (исключаем админа)
daily_query = """
SELECT
TO_CHAR(created_date, 'DD.MM') as day,
SUM(payment_amount) as revenue,
COUNT(*) as transactions
FROM payment_logs
WHERE created_date >= SYSDATE - 10 AND payment_status = 'completed' AND user_id != :admin_user_id
GROUP BY TO_CHAR(created_date, 'DD.MM'), TRUNC(created_date)
ORDER BY TRUNC(created_date) DESC
"""
cur.execute(daily_query, {"admin_user_id": admin_user_id})
daily_revenue = cur.fetchall()
return {
"total_revenue": float(revenue[0]) if revenue[0] else 0.0,
"total_transactions": revenue[1] or 0,
"avg_transaction": round(float(revenue[2]), 2) if revenue[2] else 0.0,
"unique_customers": revenue[3] or 0,
"revenue_24h": float(revenue[4]) if revenue[4] else 0.0,
"revenue_7d": float(revenue[5]) if revenue[5] else 0.0,
"revenue_30d": float(revenue[6]) if revenue[6] else 0.0,
"transactions_24h": revenue[7] or 0,
"transactions_7d": revenue[8] or 0,
"transactions_30d": revenue[9] or 0,
"daily_revenue": daily_revenue[:10] if daily_revenue else []
}
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _get_stats)
except Exception as e:
print(f"Error getting revenue stats: {e}")
return {}
async def get_finance_services_stats(self, admin_user_id: int = 0) -> dict:
"""Анализ доходов по услугам"""
def _get_stats():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
try:
# Сначала проверим, есть ли данные в таблице (исключаем админа)
check_query = "SELECT COUNT(*) FROM payment_logs WHERE payment_status = 'completed' AND user_id != :admin_user_id"
cur.execute(check_query, {"admin_user_id": admin_user_id})
total_records = cur.fetchone()[0]
print(f"Total completed payment records: {total_records}")
if total_records == 0:
return {
"services_breakdown": [],
"trends": {}
}
# Статистика по типам услуг (исключаем админа)
services_query = """
SELECT
service_type,
COUNT(*) as transactions,
SUM(payment_amount) as revenue,
AVG(payment_amount) as avg_price,
COUNT(DISTINCT user_id) as unique_users,
COUNT(CASE WHEN service_status = 'success' THEN 1 END) as success_count,
COUNT(CASE WHEN refund_status != 'no_refund' THEN 1 END) as refunds,
AVG(data_found_count) as avg_data_found
FROM payment_logs
WHERE payment_status = 'completed' AND user_id != :admin_user_id
GROUP BY service_type
ORDER BY SUM(payment_amount) DESC
"""
print(f"Executing services query: {services_query}")
cur.execute(services_query, {"admin_user_id": admin_user_id})
services = cur.fetchall()
print(f"Services result: {services}")
# Тренды по услугам за последние 30 дней - упрощенный запрос (исключаем админа)
trends_query = """
SELECT
service_type,
SUM(CASE WHEN created_date >= SYSDATE - 7 THEN 1 ELSE 0 END) as week_transactions,
SUM(CASE WHEN created_date >= SYSDATE - 30 THEN 1 ELSE 0 END) as month_transactions
FROM payment_logs
WHERE payment_status = 'completed' AND user_id != :admin_user_id
GROUP BY service_type
"""
print(f"Executing trends query: {trends_query}")
cur.execute(trends_query, {"admin_user_id": admin_user_id})
trends_result = cur.fetchall()
print(f"Trends result: {trends_result}")
trends = {row[0]: {"week": row[1], "month": row[2]} for row in trends_result} if trends_result else {}
return {
"services_breakdown": services if services else [],
"trends": trends
}
except Exception as inner_e:
print(f"Inner error in services stats: {inner_e}")
# Возвращаем базовую структуру данных
return {
"services_breakdown": [],
"trends": {}
}
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _get_stats)
except Exception as e:
print(f"Error getting services stats: {e}")
return {
"services_breakdown": [],
"trends": {}
}
async def get_finance_conversion_stats(self, admin_user_id: int = 0) -> dict:
"""Анализ конверсии"""
def _get_stats():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
try:
# Сначала получаем общее количество пользователей
users_query = "SELECT COUNT(DISTINCT id) FROM bot_users WHERE is_active = 1"
cur.execute(users_query)
total_users = cur.fetchone()[0] or 1
# Статистика по платежам (исключаем админа)
payment_query = """
SELECT
COUNT(DISTINCT user_id) as paying_users,
COUNT(*) as total_transactions,
COUNT(DISTINCT CASE WHEN user_is_premium = 1 THEN user_id END) as premium_buyers,
COUNT(DISTINCT CASE WHEN user_is_premium = 0 THEN user_id END) as regular_buyers,
AVG(payment_amount) as avg_purchase
FROM payment_logs
WHERE payment_status = 'completed' AND user_id != :admin_user_id
"""
cur.execute(payment_query, {"admin_user_id": admin_user_id})
payment_result = cur.fetchone()
if not payment_result:
return {
"total_users": total_users,
"paying_users": 0,
"conversion_rate": 0.0,
"total_transactions": 0,
"premium_buyers": 0,
"regular_buyers": 0,
"avg_purchase": 0.0,
"one_time_buyers": 0,
"regular_buyers": 0,
"loyal_buyers": 0,
"avg_purchases_per_user": 0.0,
"repeat_rate": 0.0
}
paying_users = payment_result[0] or 0
# Повторные покупки (исключаем админа)
repeat_query = """
SELECT
COUNT(CASE WHEN purchase_count = 1 THEN 1 END) as one_time_buyers,
COUNT(CASE WHEN purchase_count BETWEEN 2 AND 5 THEN 1 END) as regular_buyers_count,
COUNT(CASE WHEN purchase_count > 5 THEN 1 END) as loyal_buyers,
AVG(purchase_count) as avg_purchases_per_user
FROM (
SELECT user_id, COUNT(*) as purchase_count
FROM payment_logs
WHERE payment_status = 'completed' AND user_id != :admin_user_id
GROUP BY user_id
)
"""
cur.execute(repeat_query, {"admin_user_id": admin_user_id})
repeat = cur.fetchone()
return {
"total_users": total_users,
"paying_users": paying_users,
"conversion_rate": round(paying_users / total_users * 100, 2) if total_users > 0 else 0,
"total_transactions": payment_result[1] or 0,
"premium_buyers": payment_result[2] or 0,
"regular_buyers": payment_result[3] or 0,
"avg_purchase": round(float(payment_result[4]), 2) if payment_result[4] else 0.0,
"one_time_buyers": repeat[0] or 0 if repeat else 0,
"regular_buyers_repeat": repeat[1] or 0 if repeat else 0,
"loyal_buyers": repeat[2] or 0 if repeat else 0,
"avg_purchases_per_user": round(float(repeat[3]), 2) if repeat and repeat[3] else 0.0,
"repeat_rate": round(((repeat[1] or 0) + (repeat[2] or 0)) / paying_users * 100, 2) if paying_users > 0 and repeat else 0
}
except Exception as inner_e:
print(f"Inner error in conversion stats: {inner_e}")
return {
"total_users": 0,
"paying_users": 0,
"conversion_rate": 0.0,
"total_transactions": 0,
"premium_buyers": 0,
"regular_buyers": 0,
"avg_purchase": 0.0,
"one_time_buyers": 0,
"regular_buyers_repeat": 0,
"loyal_buyers": 0,
"avg_purchases_per_user": 0.0,
"repeat_rate": 0.0
}
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _get_stats)
except Exception as e:
print(f"Error getting conversion stats: {e}")
return {
"total_users": 0,
"paying_users": 0,
"conversion_rate": 0.0,
"total_transactions": 0,
"premium_buyers": 0,
"regular_buyers": 0,
"avg_purchase": 0.0,
"one_time_buyers": 0,
"regular_buyers_repeat": 0,
"loyal_buyers": 0,
"avg_purchases_per_user": 0.0,
"repeat_rate": 0.0
}
async def get_finance_refunds_stats(self, admin_user_id: int = 0) -> dict:
"""Анализ возвратов"""
def _get_stats():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
# Статистика возвратов (исключаем админа)
refunds_query = """
SELECT
COUNT(*) as total_transactions,
COUNT(CASE WHEN refund_status != 'no_refund' THEN 1 END) as refund_count,
SUM(CASE WHEN refund_status != 'no_refund' THEN payment_amount ELSE 0 END) as refund_amount,
COUNT(CASE WHEN refund_status = 'auto_refund' THEN 1 END) as auto_refunds,
COUNT(CASE WHEN refund_status = 'manual_refund' THEN 1 END) as manual_refunds,
COUNT(CASE WHEN refund_status = 'admin_refund' THEN 1 END) as admin_refunds
FROM payment_logs
WHERE payment_status = 'completed' AND user_id != :admin_user_id
"""
cur.execute(refunds_query, {"admin_user_id": admin_user_id})
refunds = cur.fetchone()
# Причины возвратов по типам услуг (исключаем админа)
reasons_query = """
SELECT
service_type,
refund_status,
COUNT(*) as count,
SUM(payment_amount) as amount
FROM payment_logs
WHERE refund_status != 'no_refund' AND payment_status = 'completed' AND user_id != :admin_user_id
GROUP BY service_type, refund_status
ORDER BY service_type, COUNT(*) DESC
"""
cur.execute(reasons_query, {"admin_user_id": admin_user_id})
reasons = cur.fetchall()
total_transactions = refunds[0] or 1
refund_count = refunds[1] or 0
return {
"total_transactions": total_transactions,
"refund_count": refund_count,
"refund_rate": round(refund_count / total_transactions * 100, 2),
"refund_amount": float(refunds[2]) if refunds[2] else 0.0,
"auto_refunds": refunds[3] or 0,
"manual_refunds": refunds[4] or 0,
"admin_refunds": refunds[5] or 0,
"refund_breakdown": reasons if reasons else []
}
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _get_stats)
except Exception as e:
print(f"Error getting refunds stats: {e}")
return {}
async def get_finance_transactions_stats(self, admin_user_id: int = 0) -> dict:
"""Анализ транзакций"""
def _get_stats():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
# Статистика транзакций (исключаем админа)
transactions_query = """
SELECT
COUNT(*) as total_attempts,
COUNT(CASE WHEN payment_status = 'completed' THEN 1 END) as completed_count,
COUNT(CASE WHEN payment_status = 'pending' THEN 1 END) as pending_count,
COUNT(CASE WHEN payment_status = 'failed' THEN 1 END) as failed_count,
COUNT(CASE WHEN service_status = 'success' THEN 1 END) as service_success_count,
COUNT(CASE WHEN service_status = 'no_data' THEN 1 END) as no_data_count,
COUNT(CASE WHEN service_status = 'error' THEN 1 END) as service_error_count
FROM payment_logs
WHERE user_id != :admin_user_id
"""
cur.execute(transactions_query, {"admin_user_id": admin_user_id})
transactions = cur.fetchone()
# Статистика ошибок (исключаем админа)
errors_query = """
SELECT
service_type,
COUNT(CASE WHEN payment_status = 'failed' THEN 1 END) as payment_failures,
COUNT(CASE WHEN service_status = 'error' THEN 1 END) as service_errors,
COUNT(CASE WHEN service_status = 'no_data' THEN 1 END) as no_data_cases
FROM payment_logs
WHERE user_id != :admin_user_id
GROUP BY service_type
"""
cur.execute(errors_query, {"admin_user_id": admin_user_id})
errors = cur.fetchall()
total_attempts = transactions[0] or 1
return {
"total_attempts": total_attempts,
"completed": transactions[1] or 0,
"pending": transactions[2] or 0,
"failed": transactions[3] or 0,
"payment_success_rate": round((transactions[1] or 0) / total_attempts * 100, 2),
"service_success": transactions[4] or 0,
"no_data": transactions[5] or 0,
"service_error": transactions[6] or 0,
"service_success_rate": round((transactions[4] or 0) / (transactions[1] or 1) * 100, 2),
"error_breakdown": errors if errors else []
}
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _get_stats)
except Exception as e:
print(f"Error getting transactions stats: {e}")
return {}
async def get_finance_efficiency_stats(self, admin_user_id: int = 0) -> dict:
"""Анализ эффективности"""
def _get_stats():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
try:
# Разбиваем сложный запрос на несколько простых
# 1. Средний доход на транзакцию (исключаем админа)
avg_revenue_query = """
SELECT AVG(payment_amount) as avg_revenue_per_transaction
FROM payment_logs
WHERE payment_status = 'completed' AND user_id != :admin_user_id
"""
cur.execute(avg_revenue_query, {"admin_user_id": admin_user_id})
avg_revenue_result = cur.fetchone()
avg_revenue_per_transaction = round(float(avg_revenue_result[0]), 2) if avg_revenue_result and avg_revenue_result[0] else 0.0
# 2. Количество уникальных дней с транзакциями (исключаем админа)
days_query = """
SELECT COUNT(DISTINCT TRUNC(created_date)) as unique_days
FROM payment_logs
WHERE payment_status = 'completed' AND user_id != :admin_user_id
"""
cur.execute(days_query, {"admin_user_id": admin_user_id})
days_result = cur.fetchone()
unique_days = days_result[0] if days_result and days_result[0] else 1
# 3. Общее количество транзакций (исключаем админа)
total_transactions_query = """
SELECT COUNT(*) as total_transactions
FROM payment_logs
WHERE payment_status = 'completed' AND user_id != :admin_user_id
"""
cur.execute(total_transactions_query, {"admin_user_id": admin_user_id})
total_transactions_result = cur.fetchone()
total_transactions = total_transactions_result[0] if total_transactions_result else 0
avg_transactions_per_day = round(total_transactions / unique_days, 2) if unique_days > 0 else 0
# 4. Доход и транзакции на клиента (исключаем админа)
customer_stats_query = """
SELECT
SUM(payment_amount) as total_revenue,
COUNT(*) as total_transactions,
COUNT(DISTINCT user_id) as unique_customers
FROM payment_logs
WHERE payment_status = 'completed' AND user_id != :admin_user_id
"""
cur.execute(customer_stats_query, {"admin_user_id": admin_user_id})
customer_result = cur.fetchone()
if customer_result and customer_result[2] and customer_result[2] > 0:
revenue_per_customer = round(float(customer_result[0]) / customer_result[2], 2)
transactions_per_customer = round(float(customer_result[1]) / customer_result[2], 2)
else:
revenue_per_customer = 0.0
transactions_per_customer = 0.0
# 5. Эффективность по часам дня (исключаем админа)
hourly_query = """
SELECT
EXTRACT(HOUR FROM created_date) as hour,
COUNT(*) as transactions,
SUM(payment_amount) as revenue
FROM payment_logs
WHERE payment_status = 'completed' AND created_date >= SYSDATE - 30 AND user_id != :admin_user_id
GROUP BY EXTRACT(HOUR FROM created_date)
ORDER BY EXTRACT(HOUR FROM created_date)
"""
cur.execute(hourly_query, {"admin_user_id": admin_user_id})
hourly = cur.fetchall()
# 6. Топ VIN по доходам (исключаем админа)
top_vins_query = """
SELECT
vin_number,
COUNT(*) as requests,
SUM(payment_amount) as revenue
FROM payment_logs
WHERE payment_status = 'completed' AND user_id != :admin_user_id
GROUP BY vin_number
ORDER BY SUM(payment_amount) DESC
FETCH FIRST 10 ROWS ONLY
"""
cur.execute(top_vins_query, {"admin_user_id": admin_user_id})
top_vins = cur.fetchall()
return {
"avg_revenue_per_transaction": avg_revenue_per_transaction,
"avg_transactions_per_day": avg_transactions_per_day,
"revenue_per_customer": revenue_per_customer,
"transactions_per_customer": transactions_per_customer,
"hourly_distribution": hourly if hourly else [],
"top_vins": top_vins[:10] if top_vins else []
}
except Exception as inner_e:
print(f"Inner error in efficiency stats: {inner_e}")
return {
"avg_revenue_per_transaction": 0.0,
"avg_transactions_per_day": 0.0,
"revenue_per_customer": 0.0,
"transactions_per_customer": 0.0,
"hourly_distribution": [],
"top_vins": []
}
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _get_stats)
except Exception as e:
print(f"Error getting efficiency stats: {e}")
return {
"avg_revenue_per_transaction": 0.0,
"avg_transactions_per_day": 0.0,
"revenue_per_customer": 0.0,
"transactions_per_customer": 0.0,
"hourly_distribution": [],
"top_vins": []
}