savagedb_bot/db.py

777 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# db.py
import oracledb
from typing import Optional, Tuple
# import logging
from aiogram.types import User
class OracleDatabase:
def __init__(self, user: str, password: str, dsn: str):
self._user = user
self._password = password
self._dsn = dsn
self._pool: Optional[oracledb.ConnectionPool] = None
async def connect(self):
# Oracle does not support true async I/O; use threaded connection pool
self._pool = oracledb.create_pool(
user=self._user,
password=self._password,
dsn=self._dsn,
min=1,
max=4,
increment=1,
getmode=oracledb.POOL_GETMODE_WAIT
)
async def close(self):
if self._pool:
self._pool.close()
async def fetch_user(self, user_id: int):
# Manual async wrapper since oracledb is synchronous (threaded)
def _query():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
cur.execute("SELECT id, name FROM users WHERE id = :id", {"id": user_id})
return cur.fetchone()
import asyncio
return await asyncio.to_thread(_query)
async def fetch_vin_info(self, vin: str) -> Tuple[str, str, str, int]:
# Manual async wrapper since oracledb is synchronous (threaded)
def _query():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
query = """
select 'None',
COALESCE((select value from salvagedb.m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='26'),
(select val from salvagedb.vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Make'),'UNKNOWN') make,
COALESCE((select value from salvagedb.m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='28'),
(select val from salvagedb.vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Model'),'UNKNOWN') model,
COALESCE((select value from salvagedb.m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='29'),
(select val from salvagedb.vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Model Year'),'UNKNOWN') year,
(select count(*) from salvagedb.m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin) cnt
from (select substr(:vin,1,10) svin, :vin vin from dual) s
"""
print(f"[DB DEBUG] Executing query for VIN: {vin}")
print(f"[DB DEBUG] SVIN will be: {vin[:10]}")
cur.execute(query, {"vin": vin})
result = cur.fetchone()
print(f"[DB DEBUG] Raw result from database: {result}")
if result:
make, model, year, cnt = result[1], result[2], result[3], result[4]
print(f"[DB DEBUG] Parsed values - make: '{make}', model: '{model}', year: '{year}', cnt: {cnt}")
return make, model, year, cnt
print(f"[DB DEBUG] No result found, returning defaults")
return "UNKNOWN", "UNKNOWN", "UNKNOWN", 0
import asyncio
return await asyncio.to_thread(_query)
async def count_salvage_records(self, vin: str) -> int:
"""
Подсчитывает количество записей в таблице salvagedb.salvagedb для данного VIN
"""
def _query():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM salvagedb.salvagedb WHERE vin = :vin and svin =substr(:vin,1,10)", {"vin": vin})
result = cur.fetchone()
return result[0] if result else 0
import asyncio
return await asyncio.to_thread(_query)
async def fetch_salvage_detailed_info(self, vin: str) -> list:
"""
Получает детальную информацию о поврежденияхи истории из таблицы salvagedb.salvagedb
"""
def _query():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
query = """
SELECT
odo,
odos,
dem1,
dem2,
month||'/'||year as sale_date,
JSON_VALUE(jdata, '$.RepCost') AS j_rep_cost,
JSON_VALUE(jdata, '$.Runs_Drive') AS j_runs_drive,
JSON_VALUE(jdata, '$.Locate') AS j_locate,
(select count(*) from salvagedb.salvage_images si where si.vin = s.vin and fn =1) img_count
FROM salvagedb.salvagedb s
LEFT JOIN salvagedb.addinfo i ON s.num = i.numid
WHERE vin = :vin AND svin = substr(:vin, 1, 10)
ORDER BY year DESC, month DESC
"""
cur.execute(query, {"vin": vin})
results = cur.fetchall()
# Преобразуем результаты в список словарей
detailed_records = []
for row in results:
record = {
'odo': row[0],
'odos': row[1],
'dem1': row[2],
'dem2': row[3],
'sale_date': row[4],
'j_rep_cost': row[5],
'j_runs_drive': row[6],
'j_locate': row[7],
'img_count': row[8]
}
detailed_records.append(record)
return detailed_records
import asyncio
return await asyncio.to_thread(_query)
async def fetch_photo_paths(self, vin: str) -> list:
"""
Получает список путей к фотографиям для данного VIN
"""
def _query():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
cur.execute("SELECT ipath FROM salvagedb.salvage_images WHERE fn = 1 AND vin = :vin", {"vin": vin})
results = cur.fetchall()
return [row[0] for row in results if row[0]] if results else []
import asyncio
return await asyncio.to_thread(_query)
async def fetch_detailed_vin_info(self, vin: str) -> dict:
# Manual async wrapper since oracledb is synchronous (threaded)
def _query():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
query = """
select dp.parameter_name,
dp.category_name,
d.value,
dp.endesc
from salvagedb.m_JSONS_FROM_NHTSA d
left join salvagedb.decode_params dp
on d.variableid = dp.parameter_id
where svin = substr(:vin, 1, 10)
order by dp.category_level
"""
# logging.info(f"Query: {query}")
cur.execute(query, {"vin": vin})
results = cur.fetchall()
# logging.info(f"Results: {results}")
# Organize data by categories
detailed_info = {
'basic_characteristics': {},
'engine_and_powertrain': {},
'active_safety': {},
'transmission': {},
'passive_safety': {},
'dimensions_and_construction': {},
'brake_system': {},
'lighting': {},
'additional_features': {},
'manufacturing_and_localization': {},
'ncsa_data': {},
'technical_information_and_errors': {},
'all_params': {} # Flat dictionary for easy access
}
for row in results:
param_name, category_name, value, description = row
if param_name and value:
# Create key from parameter name (lowercase, spaces to underscores)
key = param_name.lower().replace(' ', '_').replace('(', '').replace(')', '').replace('/', '_').replace('-', '_')
# Add to flat dictionary
detailed_info['all_params'][key] = value
# Add to category-specific dictionary
if category_name:
category_key = category_name.lower().replace(' ', '_').replace('(', '').replace(')', '').replace('/', '_').replace('-', '_')
if category_key in detailed_info:
detailed_info[category_key][key] = {
'value': value,
'description': description,
'param_name': param_name
}
return detailed_info
import asyncio
return await asyncio.to_thread(_query)
async def save_user(self, user: User, interaction_source: str = "bot") -> bool:
"""
Сохраняет или обновляет данные пользователя в базе данных
При первом взаимодействии создает запись, при последующих - обновляет
"""
def _save_user():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
# Проверяем, существует ли пользователь
cur.execute("SELECT id FROM bot_users WHERE id = :user_id", {"user_id": user.id})
existing_user = cur.fetchone()
if existing_user:
# Обновляем существующего пользователя
update_query = """
UPDATE bot_users SET
first_name = :first_name,
last_name = :last_name,
username = :username,
language_code = :language_code,
is_premium = :is_premium,
added_to_attachment_menu = :added_to_attachment_menu,
last_interaction_date = SYSDATE,
interaction_count = interaction_count + 1
WHERE id = :user_id
"""
params = {
"user_id": user.id,
"first_name": user.first_name,
"last_name": user.last_name,
"username": user.username,
"language_code": user.language_code,
"is_premium": 1 if user.is_premium else 0,
"added_to_attachment_menu": 1 if user.added_to_attachment_menu else 0
}
else:
# Создаем нового пользователя
insert_query = """
INSERT INTO bot_users (
id, first_name, last_name, username, language_code,
is_bot, is_premium, added_to_attachment_menu,
registration_source, first_interaction_date,
last_interaction_date, interaction_count
) VALUES (
:user_id, :first_name, :last_name, :username, :language_code,
:is_bot, :is_premium, :added_to_attachment_menu,
:registration_source, SYSDATE, SYSDATE, 1
)
"""
params = {
"user_id": user.id,
"first_name": user.first_name,
"last_name": user.last_name,
"username": user.username,
"language_code": user.language_code,
"is_bot": 1 if user.is_bot else 0,
"is_premium": 1 if user.is_premium else 0,
"added_to_attachment_menu": 1 if user.added_to_attachment_menu else 0,
"registration_source": interaction_source
}
cur.execute(insert_query, params)
conn.commit()
return True
cur.execute(update_query, params)
conn.commit()
return True
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _save_user)
except Exception as e:
print(f"Error saving user {user.id}: {e}")
return False
async def update_user_payment(self, user_id: int, payment_amount: float) -> bool:
"""
Обновляет данные о платежах пользователя
"""
def _update_payment():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
update_query = """
UPDATE bot_users SET
total_payments = total_payments + :amount,
successful_payments_count = successful_payments_count + 1,
last_interaction_date = SYSDATE
WHERE id = :user_id
"""
cur.execute(update_query, {
"user_id": user_id,
"amount": payment_amount
})
conn.commit()
return cur.rowcount > 0
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _update_payment)
except Exception as e:
print(f"Error updating payment for user {user_id}: {e}")
return False
async def get_user_stats(self, user_id: int) -> Optional[dict]:
"""
Получает статистику пользователя из базы данных
"""
def _get_stats():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
query = """
SELECT
first_name, last_name, username, language_code,
is_premium, interaction_count, total_payments,
successful_payments_count, first_interaction_date,
last_interaction_date
FROM bot_users
WHERE id = :user_id
"""
cur.execute(query, {"user_id": user_id})
result = cur.fetchone()
if result:
return {
"first_name": result[0],
"last_name": result[1],
"username": result[2],
"language_code": result[3],
"is_premium": bool(result[4]),
"interaction_count": result[5],
"total_payments": float(result[6]) if result[6] else 0.0,
"successful_payments_count": result[7],
"first_interaction_date": result[8],
"last_interaction_date": result[9]
}
return None
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _get_stats)
except Exception as e:
print(f"Error getting stats for user {user_id}: {e}")
return None
async def get_users_summary(self) -> dict:
"""
Получает общую статистику по пользователям
"""
def _get_summary():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
summary_query = """
SELECT
COUNT(*) as total_users,
COUNT(CASE WHEN is_premium = 1 THEN 1 END) as premium_users,
SUM(total_payments) as total_revenue,
SUM(successful_payments_count) as total_transactions,
COUNT(CASE WHEN last_interaction_date >= SYSDATE - 1 THEN 1 END) as active_last_24h,
COUNT(CASE WHEN last_interaction_date >= SYSDATE - 7 THEN 1 END) as active_last_week
FROM bot_users
WHERE is_active = 1
"""
cur.execute(summary_query)
result = cur.fetchone()
return {
"total_users": result[0] or 0,
"premium_users": result[1] or 0,
"total_revenue": float(result[2]) if result[2] else 0.0,
"total_transactions": result[3] or 0,
"active_last_24h": result[4] or 0,
"active_last_week": result[5] or 0
}
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _get_summary)
except Exception as e:
print(f"Error getting users summary: {e}")
return {}
async def count_photo_records(self, vin: str) -> int:
"""
Подсчитывает количество фотографий для данного VIN
"""
def _query():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM salvagedb.salvage_images WHERE vin = :vin AND fn = 1", {"vin": vin})
result = cur.fetchone()
return result[0] if result else 0
import asyncio
return await asyncio.to_thread(_query)
async def save_payment_log(self, user: User, service_type: str, vin: str, payment_data: dict, service_result: dict = None) -> bool:
"""
Логирует операцию оплаты с полной информацией о пользователе, услуге и результате
Args:
user: Объект пользователя Telegram
service_type: Тип услуги ('decode_vin', 'check_salvage', 'get_photos')
vin: VIN номер автомобиля
payment_data: Данные о платеже (сумма, transaction_id, статус)
service_result: Результат предоставления услуги (количество данных, статус)
"""
def _save_log():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
insert_query = """
INSERT INTO payment_logs (
log_id, user_id, user_first_name, user_last_name, user_username,
user_language_code, user_is_premium, service_type, vin_number,
payment_amount, transaction_id, payment_status, payment_currency,
service_status, data_found_count, refund_status, refund_reason,
vehicle_make, vehicle_model, vehicle_year, error_message,
created_date, ip_address
) VALUES (
payment_logs_seq.NEXTVAL, :user_id, :user_first_name, :user_last_name, :user_username,
:user_language_code, :user_is_premium, :service_type, :vin_number,
:payment_amount, :transaction_id, :payment_status, :payment_currency,
:service_status, :data_found_count, :refund_status, :refund_reason,
:vehicle_make, :vehicle_model, :vehicle_year, :error_message,
SYSDATE, :ip_address
)
"""
params = {
"user_id": user.id,
"user_first_name": user.first_name,
"user_last_name": user.last_name,
"user_username": user.username,
"user_language_code": user.language_code,
"user_is_premium": 1 if user.is_premium else 0,
"service_type": service_type,
"vin_number": vin,
"payment_amount": payment_data.get('amount', 0),
"transaction_id": payment_data.get('transaction_id'),
"payment_status": payment_data.get('status', 'completed'),
"payment_currency": payment_data.get('currency', 'XTR'),
"service_status": service_result.get('status', 'success') if service_result else 'pending',
"data_found_count": service_result.get('data_count', 0) if service_result else 0,
"refund_status": payment_data.get('refund_status', 'no_refund'),
"refund_reason": payment_data.get('refund_reason'),
"vehicle_make": service_result.get('vehicle_make') if service_result else None,
"vehicle_model": service_result.get('vehicle_model') if service_result else None,
"vehicle_year": service_result.get('vehicle_year') if service_result else None,
"error_message": service_result.get('error') if service_result else None,
"ip_address": None # Telegram не предоставляет IP адреса
}
cur.execute(insert_query, params)
conn.commit()
return True
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _save_log)
except Exception as e:
print(f"Error saving payment log for user {user.id}: {e}")
return False
# ==============================================
# USER ANALYTICS METHODS
# ==============================================
async def get_users_general_stats(self) -> dict:
"""Общая статистика пользователей"""
def _get_stats():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
query = """
SELECT
COUNT(*) as total_users,
COUNT(CASE WHEN is_premium = 1 THEN 1 END) as premium_users,
COUNT(CASE WHEN is_active = 1 THEN 1 END) as active_users,
COUNT(CASE WHEN is_blocked = 1 THEN 1 END) as blocked_users,
COUNT(CASE WHEN successful_payments_count > 0 THEN 1 END) as paying_users,
SUM(total_payments) as total_revenue,
SUM(successful_payments_count) as total_transactions,
SUM(interaction_count) as total_interactions,
AVG(interaction_count) as avg_interactions_per_user,
COUNT(CASE WHEN last_interaction_date >= SYSDATE - 1 THEN 1 END) as active_24h,
COUNT(CASE WHEN last_interaction_date >= SYSDATE - 7 THEN 1 END) as active_7d,
COUNT(CASE WHEN last_interaction_date >= SYSDATE - 30 THEN 1 END) as active_30d
FROM bot_users
"""
cur.execute(query)
result = cur.fetchone()
return {
"total_users": result[0] or 0,
"premium_users": result[1] or 0,
"active_users": result[2] or 0,
"blocked_users": result[3] or 0,
"paying_users": result[4] or 0,
"total_revenue": float(result[5]) if result[5] else 0.0,
"total_transactions": result[6] or 0,
"total_interactions": result[7] or 0,
"avg_interactions_per_user": round(float(result[8]), 2) if result[8] else 0.0,
"active_24h": result[9] or 0,
"active_7d": result[10] or 0,
"active_30d": result[11] or 0
}
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _get_stats)
except Exception as e:
print(f"Error getting general user stats: {e}")
return {}
async def get_users_growth_stats(self) -> dict:
"""Статистика роста пользователей"""
def _get_stats():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
query = """
SELECT
COUNT(CASE WHEN created_at >= SYSDATE - 1 THEN 1 END) as new_today,
COUNT(CASE WHEN created_at >= SYSDATE - 7 THEN 1 END) as new_week,
COUNT(CASE WHEN created_at >= SYSDATE - 30 THEN 1 END) as new_month,
COUNT(CASE WHEN created_at >= SYSDATE - 365 THEN 1 END) as new_year,
TO_CHAR(MIN(created_at), 'DD.MM.YYYY') as first_user_date,
ROUND((SYSDATE - MIN(created_at))) as days_since_start
FROM bot_users
"""
cur.execute(query)
result = cur.fetchone()
# Получаем рост по дням за последние 30 дней
daily_query = """
SELECT
TO_CHAR(created_at, 'DD.MM') as day,
COUNT(*) as count
FROM bot_users
WHERE created_at >= SYSDATE - 30
GROUP BY TO_CHAR(created_at, 'DD.MM'), TRUNC(created_at)
ORDER BY TRUNC(created_at) DESC
"""
cur.execute(daily_query)
daily_growth = cur.fetchall()
return {
"new_today": result[0] or 0,
"new_week": result[1] or 0,
"new_month": result[2] or 0,
"new_year": result[3] or 0,
"first_user_date": result[4] or "N/A",
"days_since_start": result[5] or 0,
"daily_growth": daily_growth[:10] if daily_growth else []
}
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _get_stats)
except Exception as e:
print(f"Error getting growth stats: {e}")
return {}
async def get_users_premium_stats(self) -> dict:
"""Анализ Premium пользователей"""
def _get_stats():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
query = """
SELECT
COUNT(CASE WHEN is_premium = 1 THEN 1 END) as premium_users,
COUNT(CASE WHEN is_premium = 0 THEN 1 END) as regular_users,
AVG(CASE WHEN is_premium = 1 THEN total_payments END) as premium_avg_payment,
AVG(CASE WHEN is_premium = 0 THEN total_payments END) as regular_avg_payment,
AVG(CASE WHEN is_premium = 1 THEN interaction_count END) as premium_avg_interactions,
AVG(CASE WHEN is_premium = 0 THEN interaction_count END) as regular_avg_interactions,
COUNT(CASE WHEN is_premium = 1 AND successful_payments_count > 0 THEN 1 END) as premium_paying,
COUNT(CASE WHEN is_premium = 0 AND successful_payments_count > 0 THEN 1 END) as regular_paying
FROM bot_users
WHERE is_active = 1
"""
cur.execute(query)
result = cur.fetchone()
premium_total = result[0] or 0
regular_total = result[1] or 0
return {
"premium_users": premium_total,
"regular_users": regular_total,
"premium_percentage": round((premium_total / (premium_total + regular_total) * 100), 2) if (premium_total + regular_total) > 0 else 0,
"premium_avg_payment": round(float(result[2]), 2) if result[2] else 0.0,
"regular_avg_payment": round(float(result[3]), 2) if result[3] else 0.0,
"premium_avg_interactions": round(float(result[4]), 2) if result[4] else 0.0,
"regular_avg_interactions": round(float(result[5]), 2) if result[5] else 0.0,
"premium_paying": result[6] or 0,
"regular_paying": result[7] or 0,
"premium_conversion": round((result[6] / premium_total * 100), 2) if premium_total > 0 else 0,
"regular_conversion": round((result[7] / regular_total * 100), 2) if regular_total > 0 else 0
}
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _get_stats)
except Exception as e:
print(f"Error getting premium stats: {e}")
return {}
async def get_users_geography_stats(self) -> dict:
"""География пользователей"""
def _get_stats():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
# Топ языков
lang_query = """
SELECT
COALESCE(language_code, 'unknown') as language,
COUNT(*) as count,
ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM bot_users), 2) as percentage
FROM bot_users
GROUP BY language_code
ORDER BY COUNT(*) DESC
"""
cur.execute(lang_query)
languages = cur.fetchall()
# Статистика источников регистрации
source_query = """
SELECT
registration_source,
COUNT(*) as count
FROM bot_users
GROUP BY registration_source
ORDER BY COUNT(*) DESC
"""
cur.execute(source_query)
sources = cur.fetchall()
return {
"top_languages": languages[:10] if languages else [],
"total_languages": len(languages) if languages else 0,
"registration_sources": sources if sources else []
}
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _get_stats)
except Exception as e:
print(f"Error getting geography stats: {e}")
return {}
async def get_users_activity_stats(self) -> dict:
"""Анализ активности пользователей"""
def _get_stats():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
# Активность за разные периоды
activity_query = """
SELECT
COUNT(CASE WHEN last_interaction_date >= SYSDATE - 1 THEN 1 END) as active_1d,
COUNT(CASE WHEN last_interaction_date >= SYSDATE - 3 THEN 1 END) as active_3d,
COUNT(CASE WHEN last_interaction_date >= SYSDATE - 7 THEN 1 END) as active_7d,
COUNT(CASE WHEN last_interaction_date >= SYSDATE - 14 THEN 1 END) as active_14d,
COUNT(CASE WHEN last_interaction_date >= SYSDATE - 30 THEN 1 END) as active_30d,
COUNT(CASE WHEN last_interaction_date < SYSDATE - 30 THEN 1 END) as inactive_30d
FROM bot_users
WHERE is_active = 1
"""
cur.execute(activity_query)
activity = cur.fetchone()
# Распределение по количеству взаимодействий
interaction_query = """
SELECT
CASE
WHEN interaction_count = 1 THEN 'Новички (1)'
WHEN interaction_count BETWEEN 2 AND 5 THEN 'Начинающие (2-5)'
WHEN interaction_count BETWEEN 6 AND 20 THEN 'Активные (6-20)'
WHEN interaction_count BETWEEN 21 AND 100 THEN 'Постоянные (21-100)'
ELSE 'Суперактивные (100+)'
END as category,
COUNT(*) as count
FROM bot_users
WHERE is_active = 1
GROUP BY
CASE
WHEN interaction_count = 1 THEN 'Новички (1)'
WHEN interaction_count BETWEEN 2 AND 5 THEN 'Начинающие (2-5)'
WHEN interaction_count BETWEEN 6 AND 20 THEN 'Активные (6-20)'
WHEN interaction_count BETWEEN 21 AND 100 THEN 'Постоянные (21-100)'
ELSE 'Суперактивные (100+)'
END
ORDER BY COUNT(*) DESC
"""
cur.execute(interaction_query)
interactions = cur.fetchall()
return {
"active_1d": activity[0] or 0,
"active_3d": activity[1] or 0,
"active_7d": activity[2] or 0,
"active_14d": activity[3] or 0,
"active_30d": activity[4] or 0,
"inactive_30d": activity[5] or 0,
"interaction_distribution": interactions if interactions else []
}
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _get_stats)
except Exception as e:
print(f"Error getting activity stats: {e}")
return {}
async def get_users_sources_stats(self) -> dict:
"""Анализ источников пользователей"""
def _get_stats():
with self._pool.acquire() as conn:
with conn.cursor() as cur:
# Статистика по источникам
sources_query = """
SELECT
registration_source,
COUNT(*) as total_users,
COUNT(CASE WHEN successful_payments_count > 0 THEN 1 END) as paying_users,
AVG(total_payments) as avg_revenue,
AVG(interaction_count) as avg_interactions,
COUNT(CASE WHEN created_at >= SYSDATE - 30 THEN 1 END) as new_last_30d
FROM bot_users
GROUP BY registration_source
ORDER BY COUNT(*) DESC
"""
cur.execute(sources_query)
sources = cur.fetchall()
# Топ реферальные источники (если есть)
referral_query = """
SELECT
registration_source,
COUNT(*) as count
FROM bot_users
WHERE registration_source NOT IN ('bot', 'direct', 'unknown')
GROUP BY registration_source
ORDER BY COUNT(*) DESC
FETCH FIRST 10 ROWS ONLY
"""
cur.execute(referral_query)
referrals = cur.fetchall()
return {
"source_breakdown": sources if sources else [],
"top_referrals": referrals if referrals else []
}
try:
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _get_stats)
except Exception as e:
print(f"Error getting sources stats: {e}")
return {}