# db.py import oracledb from typing import Optional, Tuple # import logging from aiogram.types import User class OracleDatabase: def __init__(self, user: str, password: str, dsn: str): self._user = user self._password = password self._dsn = dsn self._pool: Optional[oracledb.ConnectionPool] = None async def connect(self): # Oracle does not support true async I/O; use threaded connection pool self._pool = oracledb.create_pool( user=self._user, password=self._password, dsn=self._dsn, min=1, max=4, increment=1, getmode=oracledb.POOL_GETMODE_WAIT ) async def close(self): if self._pool: self._pool.close() async def fetch_user(self, user_id: int): # Manual async wrapper since oracledb is synchronous (threaded) def _query(): with self._pool.acquire() as conn: with conn.cursor() as cur: cur.execute("SELECT id, name FROM users WHERE id = :id", {"id": user_id}) return cur.fetchone() import asyncio return await asyncio.to_thread(_query) async def fetch_vin_info(self, vin: str) -> Tuple[str, str, str, int]: # Manual async wrapper since oracledb is synchronous (threaded) def _query(): with self._pool.acquire() as conn: with conn.cursor() as cur: query = """ select 'None', COALESCE((select value from salvagedb.m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='26'), (select val from salvagedb.vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Make'),'UNKNOWN') make, COALESCE((select value from salvagedb.m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='28'), (select val from salvagedb.vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Model'),'UNKNOWN') model, COALESCE((select value from salvagedb.m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='29'), (select val from salvagedb.vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Model Year'),'UNKNOWN') year, (select count(*) from salvagedb.m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin) cnt from (select substr(:vin,1,10) svin, :vin vin from dual) s """ print(f"[DB DEBUG] Executing query for VIN: {vin}") print(f"[DB DEBUG] SVIN will be: {vin[:10]}") cur.execute(query, {"vin": vin}) result = cur.fetchone() print(f"[DB DEBUG] Raw result from database: {result}") if result: make, model, year, cnt = result[1], result[2], result[3], result[4] print(f"[DB DEBUG] Parsed values - make: '{make}', model: '{model}', year: '{year}', cnt: {cnt}") return make, model, year, cnt print(f"[DB DEBUG] No result found, returning defaults") return "UNKNOWN", "UNKNOWN", "UNKNOWN", 0 import asyncio return await asyncio.to_thread(_query) async def count_salvage_records(self, vin: str) -> int: """ Подсчитывает количество записей в таблице salvagedb.salvagedb для данного VIN """ def _query(): with self._pool.acquire() as conn: with conn.cursor() as cur: cur.execute("SELECT COUNT(*) FROM salvagedb.salvagedb WHERE vin = :vin and svin =substr(:vin,1,10)", {"vin": vin}) result = cur.fetchone() return result[0] if result else 0 import asyncio return await asyncio.to_thread(_query) async def fetch_salvage_detailed_info(self, vin: str) -> list: """ Получает детальную информацию о поврежденияхи истории из таблицы salvagedb.salvagedb """ def _query(): with self._pool.acquire() as conn: with conn.cursor() as cur: query = """ SELECT odo, odos, dem1, dem2, month||'/'||year as sale_date, JSON_VALUE(jdata, '$.RepCost') AS j_rep_cost, JSON_VALUE(jdata, '$.Runs_Drive') AS j_runs_drive, JSON_VALUE(jdata, '$.Locate') AS j_locate, (select count(*) from salvagedb.salvage_images si where si.vin = s.vin and fn =1) img_count FROM salvagedb.salvagedb s LEFT JOIN salvagedb.addinfo i ON s.num = i.numid WHERE vin = :vin AND svin = substr(:vin, 1, 10) ORDER BY year DESC, month DESC """ cur.execute(query, {"vin": vin}) results = cur.fetchall() # Преобразуем результаты в список словарей detailed_records = [] for row in results: record = { 'odo': row[0], 'odos': row[1], 'dem1': row[2], 'dem2': row[3], 'sale_date': row[4], 'j_rep_cost': row[5], 'j_runs_drive': row[6], 'j_locate': row[7], 'img_count': row[8] } detailed_records.append(record) return detailed_records import asyncio return await asyncio.to_thread(_query) async def fetch_photo_paths(self, vin: str) -> list: """ Получает список путей к фотографиям для данного VIN """ def _query(): with self._pool.acquire() as conn: with conn.cursor() as cur: cur.execute("SELECT ipath FROM salvagedb.salvage_images WHERE fn = 1 AND vin = :vin", {"vin": vin}) results = cur.fetchall() return [row[0] for row in results if row[0]] if results else [] import asyncio return await asyncio.to_thread(_query) async def fetch_detailed_vin_info(self, vin: str) -> dict: # Manual async wrapper since oracledb is synchronous (threaded) def _query(): with self._pool.acquire() as conn: with conn.cursor() as cur: query = """ select dp.parameter_name, dp.category_name, d.value, dp.endesc from salvagedb.m_JSONS_FROM_NHTSA d left join salvagedb.decode_params dp on d.variableid = dp.parameter_id where svin = substr(:vin, 1, 10) order by dp.category_level """ # logging.info(f"Query: {query}") cur.execute(query, {"vin": vin}) results = cur.fetchall() # logging.info(f"Results: {results}") # Organize data by categories detailed_info = { 'basic_characteristics': {}, 'engine_and_powertrain': {}, 'active_safety': {}, 'transmission': {}, 'passive_safety': {}, 'dimensions_and_construction': {}, 'brake_system': {}, 'lighting': {}, 'additional_features': {}, 'manufacturing_and_localization': {}, 'ncsa_data': {}, 'technical_information_and_errors': {}, 'all_params': {} # Flat dictionary for easy access } for row in results: param_name, category_name, value, description = row if param_name and value: # Create key from parameter name (lowercase, spaces to underscores) key = param_name.lower().replace(' ', '_').replace('(', '').replace(')', '').replace('/', '_').replace('-', '_') # Add to flat dictionary detailed_info['all_params'][key] = value # Add to category-specific dictionary if category_name: category_key = category_name.lower().replace(' ', '_').replace('(', '').replace(')', '').replace('/', '_').replace('-', '_') if category_key in detailed_info: detailed_info[category_key][key] = { 'value': value, 'description': description, 'param_name': param_name } return detailed_info import asyncio return await asyncio.to_thread(_query) async def save_user(self, user: User, interaction_source: str = "bot") -> bool: """ Сохраняет или обновляет данные пользователя в базе данных При первом взаимодействии создает запись, при последующих - обновляет """ def _save_user(): with self._pool.acquire() as conn: with conn.cursor() as cur: # Проверяем, существует ли пользователь cur.execute("SELECT id FROM bot_users WHERE id = :user_id", {"user_id": user.id}) existing_user = cur.fetchone() if existing_user: # Обновляем существующего пользователя update_query = """ UPDATE bot_users SET first_name = :first_name, last_name = :last_name, username = :username, language_code = :language_code, is_premium = :is_premium, added_to_attachment_menu = :added_to_attachment_menu, last_interaction_date = SYSDATE, interaction_count = interaction_count + 1 WHERE id = :user_id """ params = { "user_id": user.id, "first_name": user.first_name, "last_name": user.last_name, "username": user.username, "language_code": user.language_code, "is_premium": 1 if user.is_premium else 0, "added_to_attachment_menu": 1 if user.added_to_attachment_menu else 0 } else: # Создаем нового пользователя insert_query = """ INSERT INTO bot_users ( id, first_name, last_name, username, language_code, is_bot, is_premium, added_to_attachment_menu, registration_source, first_interaction_date, last_interaction_date, interaction_count ) VALUES ( :user_id, :first_name, :last_name, :username, :language_code, :is_bot, :is_premium, :added_to_attachment_menu, :registration_source, SYSDATE, SYSDATE, 1 ) """ params = { "user_id": user.id, "first_name": user.first_name, "last_name": user.last_name, "username": user.username, "language_code": user.language_code, "is_bot": 1 if user.is_bot else 0, "is_premium": 1 if user.is_premium else 0, "added_to_attachment_menu": 1 if user.added_to_attachment_menu else 0, "registration_source": interaction_source } cur.execute(insert_query, params) conn.commit() return True cur.execute(update_query, params) conn.commit() return True try: import asyncio loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _save_user) except Exception as e: print(f"Error saving user {user.id}: {e}") return False async def update_user_payment(self, user_id: int, payment_amount: float) -> bool: """ Обновляет данные о платежах пользователя """ def _update_payment(): with self._pool.acquire() as conn: with conn.cursor() as cur: update_query = """ UPDATE bot_users SET total_payments = total_payments + :amount, successful_payments_count = successful_payments_count + 1, last_interaction_date = SYSDATE WHERE id = :user_id """ cur.execute(update_query, { "user_id": user_id, "amount": payment_amount }) conn.commit() return cur.rowcount > 0 try: import asyncio loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _update_payment) except Exception as e: print(f"Error updating payment for user {user_id}: {e}") return False async def get_user_stats(self, user_id: int) -> Optional[dict]: """ Получает статистику пользователя из базы данных """ def _get_stats(): with self._pool.acquire() as conn: with conn.cursor() as cur: query = """ SELECT first_name, last_name, username, language_code, is_premium, interaction_count, total_payments, successful_payments_count, first_interaction_date, last_interaction_date FROM bot_users WHERE id = :user_id """ cur.execute(query, {"user_id": user_id}) result = cur.fetchone() if result: return { "first_name": result[0], "last_name": result[1], "username": result[2], "language_code": result[3], "is_premium": bool(result[4]), "interaction_count": result[5], "total_payments": float(result[6]) if result[6] else 0.0, "successful_payments_count": result[7], "first_interaction_date": result[8], "last_interaction_date": result[9] } return None try: import asyncio loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _get_stats) except Exception as e: print(f"Error getting stats for user {user_id}: {e}") return None async def get_users_summary(self) -> dict: """ Получает общую статистику по пользователям """ def _get_summary(): with self._pool.acquire() as conn: with conn.cursor() as cur: summary_query = """ SELECT COUNT(*) as total_users, COUNT(CASE WHEN is_premium = 1 THEN 1 END) as premium_users, SUM(total_payments) as total_revenue, SUM(successful_payments_count) as total_transactions, COUNT(CASE WHEN last_interaction_date >= SYSDATE - 1 THEN 1 END) as active_last_24h, COUNT(CASE WHEN last_interaction_date >= SYSDATE - 7 THEN 1 END) as active_last_week FROM bot_users WHERE is_active = 1 """ cur.execute(summary_query) result = cur.fetchone() return { "total_users": result[0] or 0, "premium_users": result[1] or 0, "total_revenue": float(result[2]) if result[2] else 0.0, "total_transactions": result[3] or 0, "active_last_24h": result[4] or 0, "active_last_week": result[5] or 0 } try: import asyncio loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _get_summary) except Exception as e: print(f"Error getting users summary: {e}") return {} async def count_photo_records(self, vin: str) -> int: """ Подсчитывает количество фотографий для данного VIN """ def _query(): with self._pool.acquire() as conn: with conn.cursor() as cur: cur.execute("SELECT COUNT(*) FROM salvagedb.salvage_images WHERE vin = :vin AND fn = 1", {"vin": vin}) result = cur.fetchone() return result[0] if result else 0 import asyncio return await asyncio.to_thread(_query) async def save_payment_log(self, user: User, service_type: str, vin: str, payment_data: dict, service_result: dict = None) -> bool: """ Логирует операцию оплаты с полной информацией о пользователе, услуге и результате Args: user: Объект пользователя Telegram service_type: Тип услуги ('decode_vin', 'check_salvage', 'get_photos') vin: VIN номер автомобиля payment_data: Данные о платеже (сумма, transaction_id, статус) service_result: Результат предоставления услуги (количество данных, статус) """ def _save_log(): with self._pool.acquire() as conn: with conn.cursor() as cur: insert_query = """ INSERT INTO payment_logs ( log_id, user_id, user_first_name, user_last_name, user_username, user_language_code, user_is_premium, service_type, vin_number, payment_amount, transaction_id, payment_status, payment_currency, service_status, data_found_count, refund_status, refund_reason, vehicle_make, vehicle_model, vehicle_year, error_message, created_date, ip_address ) VALUES ( payment_logs_seq.NEXTVAL, :user_id, :user_first_name, :user_last_name, :user_username, :user_language_code, :user_is_premium, :service_type, :vin_number, :payment_amount, :transaction_id, :payment_status, :payment_currency, :service_status, :data_found_count, :refund_status, :refund_reason, :vehicle_make, :vehicle_model, :vehicle_year, :error_message, SYSDATE, :ip_address ) """ params = { "user_id": user.id, "user_first_name": user.first_name, "user_last_name": user.last_name, "user_username": user.username, "user_language_code": user.language_code, "user_is_premium": 1 if user.is_premium else 0, "service_type": service_type, "vin_number": vin, "payment_amount": payment_data.get('amount', 0), "transaction_id": payment_data.get('transaction_id'), "payment_status": payment_data.get('status', 'completed'), "payment_currency": payment_data.get('currency', 'XTR'), "service_status": service_result.get('status', 'success') if service_result else 'pending', "data_found_count": service_result.get('data_count', 0) if service_result else 0, "refund_status": payment_data.get('refund_status', 'no_refund'), "refund_reason": payment_data.get('refund_reason'), "vehicle_make": service_result.get('vehicle_make') if service_result else None, "vehicle_model": service_result.get('vehicle_model') if service_result else None, "vehicle_year": service_result.get('vehicle_year') if service_result else None, "error_message": service_result.get('error') if service_result else None, "ip_address": None # Telegram не предоставляет IP адреса } cur.execute(insert_query, params) conn.commit() return True try: import asyncio loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _save_log) except Exception as e: print(f"Error saving payment log for user {user.id}: {e}") return False # ============================================== # USER ANALYTICS METHODS # ============================================== async def get_users_general_stats(self) -> dict: """Общая статистика пользователей""" def _get_stats(): with self._pool.acquire() as conn: with conn.cursor() as cur: query = """ SELECT COUNT(*) as total_users, COUNT(CASE WHEN is_premium = 1 THEN 1 END) as premium_users, COUNT(CASE WHEN is_active = 1 THEN 1 END) as active_users, COUNT(CASE WHEN is_blocked = 1 THEN 1 END) as blocked_users, COUNT(CASE WHEN successful_payments_count > 0 THEN 1 END) as paying_users, SUM(total_payments) as total_revenue, SUM(successful_payments_count) as total_transactions, SUM(interaction_count) as total_interactions, AVG(interaction_count) as avg_interactions_per_user, COUNT(CASE WHEN last_interaction_date >= SYSDATE - 1 THEN 1 END) as active_24h, COUNT(CASE WHEN last_interaction_date >= SYSDATE - 7 THEN 1 END) as active_7d, COUNT(CASE WHEN last_interaction_date >= SYSDATE - 30 THEN 1 END) as active_30d FROM bot_users """ cur.execute(query) result = cur.fetchone() return { "total_users": result[0] or 0, "premium_users": result[1] or 0, "active_users": result[2] or 0, "blocked_users": result[3] or 0, "paying_users": result[4] or 0, "total_revenue": float(result[5]) if result[5] else 0.0, "total_transactions": result[6] or 0, "total_interactions": result[7] or 0, "avg_interactions_per_user": round(float(result[8]), 2) if result[8] else 0.0, "active_24h": result[9] or 0, "active_7d": result[10] or 0, "active_30d": result[11] or 0 } try: import asyncio loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _get_stats) except Exception as e: print(f"Error getting general user stats: {e}") return {} async def get_users_growth_stats(self) -> dict: """Статистика роста пользователей""" def _get_stats(): with self._pool.acquire() as conn: with conn.cursor() as cur: query = """ SELECT COUNT(CASE WHEN created_at >= SYSDATE - 1 THEN 1 END) as new_today, COUNT(CASE WHEN created_at >= SYSDATE - 7 THEN 1 END) as new_week, COUNT(CASE WHEN created_at >= SYSDATE - 30 THEN 1 END) as new_month, COUNT(CASE WHEN created_at >= SYSDATE - 365 THEN 1 END) as new_year, TO_CHAR(MIN(created_at), 'DD.MM.YYYY') as first_user_date, ROUND((SYSDATE - MIN(created_at))) as days_since_start FROM bot_users """ cur.execute(query) result = cur.fetchone() # Получаем рост по дням за последние 30 дней daily_query = """ SELECT TO_CHAR(created_at, 'DD.MM') as day, COUNT(*) as count FROM bot_users WHERE created_at >= SYSDATE - 30 GROUP BY TO_CHAR(created_at, 'DD.MM'), TRUNC(created_at) ORDER BY TRUNC(created_at) DESC """ cur.execute(daily_query) daily_growth = cur.fetchall() return { "new_today": result[0] or 0, "new_week": result[1] or 0, "new_month": result[2] or 0, "new_year": result[3] or 0, "first_user_date": result[4] or "N/A", "days_since_start": result[5] or 0, "daily_growth": daily_growth[:10] if daily_growth else [] } try: import asyncio loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _get_stats) except Exception as e: print(f"Error getting growth stats: {e}") return {} async def get_users_premium_stats(self) -> dict: """Анализ Premium пользователей""" def _get_stats(): with self._pool.acquire() as conn: with conn.cursor() as cur: query = """ SELECT COUNT(CASE WHEN is_premium = 1 THEN 1 END) as premium_users, COUNT(CASE WHEN is_premium = 0 THEN 1 END) as regular_users, AVG(CASE WHEN is_premium = 1 THEN total_payments END) as premium_avg_payment, AVG(CASE WHEN is_premium = 0 THEN total_payments END) as regular_avg_payment, AVG(CASE WHEN is_premium = 1 THEN interaction_count END) as premium_avg_interactions, AVG(CASE WHEN is_premium = 0 THEN interaction_count END) as regular_avg_interactions, COUNT(CASE WHEN is_premium = 1 AND successful_payments_count > 0 THEN 1 END) as premium_paying, COUNT(CASE WHEN is_premium = 0 AND successful_payments_count > 0 THEN 1 END) as regular_paying FROM bot_users WHERE is_active = 1 """ cur.execute(query) result = cur.fetchone() premium_total = result[0] or 0 regular_total = result[1] or 0 return { "premium_users": premium_total, "regular_users": regular_total, "premium_percentage": round((premium_total / (premium_total + regular_total) * 100), 2) if (premium_total + regular_total) > 0 else 0, "premium_avg_payment": round(float(result[2]), 2) if result[2] else 0.0, "regular_avg_payment": round(float(result[3]), 2) if result[3] else 0.0, "premium_avg_interactions": round(float(result[4]), 2) if result[4] else 0.0, "regular_avg_interactions": round(float(result[5]), 2) if result[5] else 0.0, "premium_paying": result[6] or 0, "regular_paying": result[7] or 0, "premium_conversion": round((result[6] / premium_total * 100), 2) if premium_total > 0 else 0, "regular_conversion": round((result[7] / regular_total * 100), 2) if regular_total > 0 else 0 } try: import asyncio loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _get_stats) except Exception as e: print(f"Error getting premium stats: {e}") return {} async def get_users_geography_stats(self) -> dict: """География пользователей""" def _get_stats(): with self._pool.acquire() as conn: with conn.cursor() as cur: # Топ языков lang_query = """ SELECT COALESCE(language_code, 'unknown') as language, COUNT(*) as count, ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM bot_users), 2) as percentage FROM bot_users GROUP BY language_code ORDER BY COUNT(*) DESC """ cur.execute(lang_query) languages = cur.fetchall() # Статистика источников регистрации source_query = """ SELECT registration_source, COUNT(*) as count FROM bot_users GROUP BY registration_source ORDER BY COUNT(*) DESC """ cur.execute(source_query) sources = cur.fetchall() return { "top_languages": languages[:10] if languages else [], "total_languages": len(languages) if languages else 0, "registration_sources": sources if sources else [] } try: import asyncio loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _get_stats) except Exception as e: print(f"Error getting geography stats: {e}") return {} async def get_users_activity_stats(self) -> dict: """Анализ активности пользователей""" def _get_stats(): with self._pool.acquire() as conn: with conn.cursor() as cur: # Активность за разные периоды activity_query = """ SELECT COUNT(CASE WHEN last_interaction_date >= SYSDATE - 1 THEN 1 END) as active_1d, COUNT(CASE WHEN last_interaction_date >= SYSDATE - 3 THEN 1 END) as active_3d, COUNT(CASE WHEN last_interaction_date >= SYSDATE - 7 THEN 1 END) as active_7d, COUNT(CASE WHEN last_interaction_date >= SYSDATE - 14 THEN 1 END) as active_14d, COUNT(CASE WHEN last_interaction_date >= SYSDATE - 30 THEN 1 END) as active_30d, COUNT(CASE WHEN last_interaction_date < SYSDATE - 30 THEN 1 END) as inactive_30d FROM bot_users WHERE is_active = 1 """ cur.execute(activity_query) activity = cur.fetchone() # Распределение по количеству взаимодействий interaction_query = """ SELECT CASE WHEN interaction_count = 1 THEN 'Новички (1)' WHEN interaction_count BETWEEN 2 AND 5 THEN 'Начинающие (2-5)' WHEN interaction_count BETWEEN 6 AND 20 THEN 'Активные (6-20)' WHEN interaction_count BETWEEN 21 AND 100 THEN 'Постоянные (21-100)' ELSE 'Суперактивные (100+)' END as category, COUNT(*) as count FROM bot_users WHERE is_active = 1 GROUP BY CASE WHEN interaction_count = 1 THEN 'Новички (1)' WHEN interaction_count BETWEEN 2 AND 5 THEN 'Начинающие (2-5)' WHEN interaction_count BETWEEN 6 AND 20 THEN 'Активные (6-20)' WHEN interaction_count BETWEEN 21 AND 100 THEN 'Постоянные (21-100)' ELSE 'Суперактивные (100+)' END ORDER BY COUNT(*) DESC """ cur.execute(interaction_query) interactions = cur.fetchall() return { "active_1d": activity[0] or 0, "active_3d": activity[1] or 0, "active_7d": activity[2] or 0, "active_14d": activity[3] or 0, "active_30d": activity[4] or 0, "inactive_30d": activity[5] or 0, "interaction_distribution": interactions if interactions else [] } try: import asyncio loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _get_stats) except Exception as e: print(f"Error getting activity stats: {e}") return {} async def get_users_sources_stats(self) -> dict: """Анализ источников пользователей""" def _get_stats(): with self._pool.acquire() as conn: with conn.cursor() as cur: # Статистика по источникам sources_query = """ SELECT registration_source, COUNT(*) as total_users, COUNT(CASE WHEN successful_payments_count > 0 THEN 1 END) as paying_users, AVG(total_payments) as avg_revenue, AVG(interaction_count) as avg_interactions, COUNT(CASE WHEN created_at >= SYSDATE - 30 THEN 1 END) as new_last_30d FROM bot_users GROUP BY registration_source ORDER BY COUNT(*) DESC """ cur.execute(sources_query) sources = cur.fetchall() # Топ реферальные источники (если есть) referral_query = """ SELECT registration_source, COUNT(*) as count FROM bot_users WHERE registration_source NOT IN ('bot', 'direct', 'unknown') GROUP BY registration_source ORDER BY COUNT(*) DESC FETCH FIRST 10 ROWS ONLY """ cur.execute(referral_query) referrals = cur.fetchall() return { "source_breakdown": sources if sources else [], "top_referrals": referrals if referrals else [] } try: import asyncio loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _get_stats) except Exception as e: print(f"Error getting sources stats: {e}") return {} # ============================================== # FINANCIAL ANALYTICS METHODS # ============================================== async def get_finance_revenue_stats(self, admin_user_id: int = 0) -> dict: """Анализ доходов""" def _get_stats(): with self._pool.acquire() as conn: with conn.cursor() as cur: # Общая статистика доходов (исключаем админа) revenue_query = """ SELECT SUM(payment_amount) as total_revenue, COUNT(*) as total_transactions, AVG(payment_amount) as avg_transaction, COUNT(DISTINCT user_id) as unique_customers, SUM(CASE WHEN created_date >= SYSDATE - 1 THEN payment_amount ELSE 0 END) as revenue_24h, SUM(CASE WHEN created_date >= SYSDATE - 7 THEN payment_amount ELSE 0 END) as revenue_7d, SUM(CASE WHEN created_date >= SYSDATE - 30 THEN payment_amount ELSE 0 END) as revenue_30d, COUNT(CASE WHEN created_date >= SYSDATE - 1 THEN 1 END) as transactions_24h, COUNT(CASE WHEN created_date >= SYSDATE - 7 THEN 1 END) as transactions_7d, COUNT(CASE WHEN created_date >= SYSDATE - 30 THEN 1 END) as transactions_30d FROM payment_logs WHERE payment_status = 'completed' AND user_id != :admin_user_id """ cur.execute(revenue_query, {"admin_user_id": admin_user_id}) revenue = cur.fetchone() # Доходы по дням за последние 10 дней (исключаем админа) daily_query = """ SELECT TO_CHAR(created_date, 'DD.MM') as day, SUM(payment_amount) as revenue, COUNT(*) as transactions FROM payment_logs WHERE created_date >= SYSDATE - 10 AND payment_status = 'completed' AND user_id != :admin_user_id GROUP BY TO_CHAR(created_date, 'DD.MM'), TRUNC(created_date) ORDER BY TRUNC(created_date) DESC """ cur.execute(daily_query, {"admin_user_id": admin_user_id}) daily_revenue = cur.fetchall() return { "total_revenue": float(revenue[0]) if revenue[0] else 0.0, "total_transactions": revenue[1] or 0, "avg_transaction": round(float(revenue[2]), 2) if revenue[2] else 0.0, "unique_customers": revenue[3] or 0, "revenue_24h": float(revenue[4]) if revenue[4] else 0.0, "revenue_7d": float(revenue[5]) if revenue[5] else 0.0, "revenue_30d": float(revenue[6]) if revenue[6] else 0.0, "transactions_24h": revenue[7] or 0, "transactions_7d": revenue[8] or 0, "transactions_30d": revenue[9] or 0, "daily_revenue": daily_revenue[:10] if daily_revenue else [] } try: import asyncio loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _get_stats) except Exception as e: print(f"Error getting revenue stats: {e}") return {} async def get_finance_services_stats(self, admin_user_id: int = 0) -> dict: """Анализ доходов по услугам""" def _get_stats(): with self._pool.acquire() as conn: with conn.cursor() as cur: try: # Сначала проверим, есть ли данные в таблице (исключаем админа) check_query = "SELECT COUNT(*) FROM payment_logs WHERE payment_status = 'completed' AND user_id != :admin_user_id" cur.execute(check_query, {"admin_user_id": admin_user_id}) total_records = cur.fetchone()[0] print(f"Total completed payment records: {total_records}") if total_records == 0: return { "services_breakdown": [], "trends": {} } # Статистика по типам услуг (исключаем админа) services_query = """ SELECT service_type, COUNT(*) as transactions, SUM(payment_amount) as revenue, AVG(payment_amount) as avg_price, COUNT(DISTINCT user_id) as unique_users, COUNT(CASE WHEN service_status = 'success' THEN 1 END) as success_count, COUNT(CASE WHEN refund_status <> 'no_refund' THEN 1 END) as refunds, AVG(data_found_count) as avg_data_found FROM payment_logs WHERE payment_status = 'completed' AND user_id != :admin_user_id GROUP BY service_type ORDER BY SUM(payment_amount) DESC """ print(f"Executing services query: {services_query}") cur.execute(services_query, {"admin_user_id": admin_user_id}) services = cur.fetchall() print(f"Services result: {services}") # Тренды по услугам за последние 30 дней - упрощенный запрос (исключаем админа) trends_query = """ SELECT service_type, SUM(CASE WHEN created_date >= SYSDATE - 7 THEN 1 ELSE 0 END) as week_transactions, SUM(CASE WHEN created_date >= SYSDATE - 30 THEN 1 ELSE 0 END) as month_transactions FROM payment_logs WHERE payment_status = 'completed' AND user_id != :admin_user_id GROUP BY service_type """ print(f"Executing trends query: {trends_query}") cur.execute(trends_query, {"admin_user_id": admin_user_id}) trends_result = cur.fetchall() print(f"Trends result: {trends_result}") trends = {row[0]: {"week": row[1], "month": row[2]} for row in trends_result} if trends_result else {} return { "services_breakdown": services if services else [], "trends": trends } except Exception as inner_e: print(f"Inner error in services stats: {inner_e}") # Возвращаем базовую структуру данных return { "services_breakdown": [], "trends": {} } try: import asyncio loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _get_stats) except Exception as e: print(f"Error getting services stats: {e}") return { "services_breakdown": [], "trends": {} } async def get_finance_conversion_stats(self, admin_user_id: int = 0) -> dict: """Анализ конверсии""" def _get_stats(): with self._pool.acquire() as conn: with conn.cursor() as cur: try: # Сначала получаем общее количество пользователей users_query = "SELECT COUNT(DISTINCT id) FROM bot_users WHERE is_active = 1" cur.execute(users_query) total_users = cur.fetchone()[0] or 1 # Статистика по платежам (исключаем админа) payment_query = """ SELECT COUNT(DISTINCT user_id) as paying_users, COUNT(*) as total_transactions, COUNT(DISTINCT CASE WHEN user_is_premium = 1 THEN user_id END) as premium_buyers, COUNT(DISTINCT CASE WHEN user_is_premium = 0 THEN user_id END) as regular_buyers, AVG(payment_amount) as avg_purchase FROM payment_logs WHERE payment_status = 'completed' AND user_id != :admin_user_id """ cur.execute(payment_query, {"admin_user_id": admin_user_id}) payment_result = cur.fetchone() if not payment_result: return { "total_users": total_users, "paying_users": 0, "conversion_rate": 0.0, "total_transactions": 0, "premium_buyers": 0, "regular_buyers": 0, "avg_purchase": 0.0, "one_time_buyers": 0, "regular_buyers": 0, "loyal_buyers": 0, "avg_purchases_per_user": 0.0, "repeat_rate": 0.0 } paying_users = payment_result[0] or 0 # Повторные покупки (исключаем админа) repeat_query = """ SELECT COUNT(CASE WHEN purchase_count = 1 THEN 1 END) as one_time_buyers, COUNT(CASE WHEN purchase_count BETWEEN 2 AND 5 THEN 1 END) as regular_buyers_count, COUNT(CASE WHEN purchase_count > 5 THEN 1 END) as loyal_buyers, AVG(purchase_count) as avg_purchases_per_user FROM ( SELECT user_id, COUNT(*) as purchase_count FROM payment_logs WHERE payment_status = 'completed' AND user_id != :admin_user_id GROUP BY user_id ) """ cur.execute(repeat_query, {"admin_user_id": admin_user_id}) repeat = cur.fetchone() return { "total_users": total_users, "paying_users": paying_users, "conversion_rate": round(paying_users / total_users * 100, 2) if total_users > 0 else 0, "total_transactions": payment_result[1] or 0, "premium_buyers": payment_result[2] or 0, "regular_buyers": payment_result[3] or 0, "avg_purchase": round(float(payment_result[4]), 2) if payment_result[4] else 0.0, "one_time_buyers": repeat[0] or 0 if repeat else 0, "regular_buyers_repeat": repeat[1] or 0 if repeat else 0, "loyal_buyers": repeat[2] or 0 if repeat else 0, "avg_purchases_per_user": round(float(repeat[3]), 2) if repeat and repeat[3] else 0.0, "repeat_rate": round(((repeat[1] or 0) + (repeat[2] or 0)) / paying_users * 100, 2) if paying_users > 0 and repeat else 0 } except Exception as inner_e: print(f"Inner error in conversion stats: {inner_e}") return { "total_users": 0, "paying_users": 0, "conversion_rate": 0.0, "total_transactions": 0, "premium_buyers": 0, "regular_buyers": 0, "avg_purchase": 0.0, "one_time_buyers": 0, "regular_buyers_repeat": 0, "loyal_buyers": 0, "avg_purchases_per_user": 0.0, "repeat_rate": 0.0 } try: import asyncio loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _get_stats) except Exception as e: print(f"Error getting conversion stats: {e}") return { "total_users": 0, "paying_users": 0, "conversion_rate": 0.0, "total_transactions": 0, "premium_buyers": 0, "regular_buyers": 0, "avg_purchase": 0.0, "one_time_buyers": 0, "regular_buyers_repeat": 0, "loyal_buyers": 0, "avg_purchases_per_user": 0.0, "repeat_rate": 0.0 } async def get_finance_refunds_stats(self, admin_user_id: int = 0) -> dict: """Анализ возвратов""" def _get_stats(): with self._pool.acquire() as conn: with conn.cursor() as cur: # Статистика возвратов (исключаем админа) refunds_query = """ SELECT COUNT(*) as total_transactions, COUNT(CASE WHEN refund_status <> 'no_refund' THEN 1 END) as refund_count, SUM(CASE WHEN refund_status <> 'no_refund' THEN payment_amount ELSE 0 END) as refund_amount, COUNT(CASE WHEN refund_status = 'auto_refund' THEN 1 END) as auto_refunds, COUNT(CASE WHEN refund_status = 'manual_refund' THEN 1 END) as manual_refunds, COUNT(CASE WHEN refund_status = 'admin_refund' THEN 1 END) as admin_refunds FROM payment_logs WHERE payment_status = 'completed' AND user_id != :admin_user_id """ cur.execute(refunds_query, {"admin_user_id": admin_user_id}) refunds = cur.fetchone() # Причины возвратов по типам услуг (исключаем админа) reasons_query = """ SELECT service_type, refund_status, COUNT(*) as count, SUM(payment_amount) as amount FROM payment_logs WHERE refund_status <> 'no_refund' AND payment_status = 'completed' AND user_id <> :admin_user_id GROUP BY service_type, refund_status ORDER BY service_type, COUNT(*) DESC """ cur.execute(reasons_query, {"admin_user_id": admin_user_id}) reasons = cur.fetchall() total_transactions = refunds[0] or 1 refund_count = refunds[1] or 0 return { "total_transactions": total_transactions, "refund_count": refund_count, "refund_rate": round(refund_count / total_transactions * 100, 2), "refund_amount": float(refunds[2]) if refunds[2] else 0.0, "auto_refunds": refunds[3] or 0, "manual_refunds": refunds[4] or 0, "admin_refunds": refunds[5] or 0, "refund_breakdown": reasons if reasons else [] } try: import asyncio loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _get_stats) except Exception as e: print(f"Error getting refunds stats: {e}") return {} async def get_finance_transactions_stats(self, admin_user_id: int = 0) -> dict: """Анализ транзакций""" def _get_stats(): with self._pool.acquire() as conn: with conn.cursor() as cur: # Статистика транзакций (исключаем админа) transactions_query = """ SELECT COUNT(*) as total_attempts, COUNT(CASE WHEN payment_status = 'completed' THEN 1 END) as completed_count, COUNT(CASE WHEN payment_status = 'pending' THEN 1 END) as pending_count, COUNT(CASE WHEN payment_status = 'failed' THEN 1 END) as failed_count, COUNT(CASE WHEN service_status = 'success' THEN 1 END) as service_success_count, COUNT(CASE WHEN service_status = 'no_data' THEN 1 END) as no_data_count, COUNT(CASE WHEN service_status = 'error' THEN 1 END) as service_error_count FROM payment_logs WHERE user_id != :admin_user_id """ cur.execute(transactions_query, {"admin_user_id": admin_user_id}) transactions = cur.fetchone() # Статистика ошибок (исключаем админа) errors_query = """ SELECT service_type, COUNT(CASE WHEN payment_status = 'failed' THEN 1 END) as payment_failures, COUNT(CASE WHEN service_status = 'error' THEN 1 END) as service_errors, COUNT(CASE WHEN service_status = 'no_data' THEN 1 END) as no_data_cases FROM payment_logs WHERE user_id != :admin_user_id GROUP BY service_type """ cur.execute(errors_query, {"admin_user_id": admin_user_id}) errors = cur.fetchall() total_attempts = transactions[0] or 1 return { "total_attempts": total_attempts, "completed": transactions[1] or 0, "pending": transactions[2] or 0, "failed": transactions[3] or 0, "payment_success_rate": round((transactions[1] or 0) / total_attempts * 100, 2), "service_success": transactions[4] or 0, "no_data": transactions[5] or 0, "service_error": transactions[6] or 0, "service_success_rate": round((transactions[4] or 0) / (transactions[1] or 1) * 100, 2), "error_breakdown": errors if errors else [] } try: import asyncio loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _get_stats) except Exception as e: print(f"Error getting transactions stats: {e}") return {} async def get_finance_efficiency_stats(self, admin_user_id: int = 0) -> dict: """Анализ эффективности""" def _get_stats(): with self._pool.acquire() as conn: with conn.cursor() as cur: try: # Разбиваем сложный запрос на несколько простых # 1. Средний доход на транзакцию (исключаем админа) avg_revenue_query = """ SELECT AVG(payment_amount) as avg_revenue_per_transaction FROM payment_logs WHERE payment_status = 'completed' AND user_id != :admin_user_id """ cur.execute(avg_revenue_query, {"admin_user_id": admin_user_id}) avg_revenue_result = cur.fetchone() avg_revenue_per_transaction = round(float(avg_revenue_result[0]), 2) if avg_revenue_result and avg_revenue_result[0] else 0.0 # 2. Количество уникальных дней с транзакциями (исключаем админа) days_query = """ SELECT COUNT(DISTINCT TRUNC(created_date)) as unique_days FROM payment_logs WHERE payment_status = 'completed' AND user_id != :admin_user_id """ cur.execute(days_query, {"admin_user_id": admin_user_id}) days_result = cur.fetchone() unique_days = days_result[0] if days_result and days_result[0] else 1 # 3. Общее количество транзакций (исключаем админа) total_transactions_query = """ SELECT COUNT(*) as total_transactions FROM payment_logs WHERE payment_status = 'completed' AND user_id != :admin_user_id """ cur.execute(total_transactions_query, {"admin_user_id": admin_user_id}) total_transactions_result = cur.fetchone() total_transactions = total_transactions_result[0] if total_transactions_result else 0 avg_transactions_per_day = round(total_transactions / unique_days, 2) if unique_days > 0 else 0 # 4. Доход и транзакции на клиента (исключаем админа) customer_stats_query = """ SELECT SUM(payment_amount) as total_revenue, COUNT(*) as total_transactions, COUNT(DISTINCT user_id) as unique_customers FROM payment_logs WHERE payment_status = 'completed' AND user_id != :admin_user_id """ cur.execute(customer_stats_query, {"admin_user_id": admin_user_id}) customer_result = cur.fetchone() if customer_result and customer_result[2] and customer_result[2] > 0: revenue_per_customer = round(float(customer_result[0]) / customer_result[2], 2) transactions_per_customer = round(float(customer_result[1]) / customer_result[2], 2) else: revenue_per_customer = 0.0 transactions_per_customer = 0.0 # 5. Эффективность по часам дня (исключаем админа) hourly_query = """ SELECT EXTRACT(HOUR FROM created_date) as hour, COUNT(*) as transactions, SUM(payment_amount) as revenue FROM payment_logs WHERE payment_status = 'completed' AND created_date >= SYSDATE - 30 AND user_id != :admin_user_id GROUP BY EXTRACT(HOUR FROM created_date) ORDER BY EXTRACT(HOUR FROM created_date) """ cur.execute(hourly_query, {"admin_user_id": admin_user_id}) hourly = cur.fetchall() # 6. Топ VIN по доходам (исключаем админа) top_vins_query = """ SELECT vin_number, COUNT(*) as requests, SUM(payment_amount) as revenue FROM payment_logs WHERE payment_status = 'completed' AND user_id != :admin_user_id GROUP BY vin_number ORDER BY SUM(payment_amount) DESC FETCH FIRST 10 ROWS ONLY """ cur.execute(top_vins_query, {"admin_user_id": admin_user_id}) top_vins = cur.fetchall() return { "avg_revenue_per_transaction": avg_revenue_per_transaction, "avg_transactions_per_day": avg_transactions_per_day, "revenue_per_customer": revenue_per_customer, "transactions_per_customer": transactions_per_customer, "hourly_distribution": hourly if hourly else [], "top_vins": top_vins[:10] if top_vins else [] } except Exception as inner_e: print(f"Inner error in efficiency stats: {inner_e}") return { "avg_revenue_per_transaction": 0.0, "avg_transactions_per_day": 0.0, "revenue_per_customer": 0.0, "transactions_per_customer": 0.0, "hourly_distribution": [], "top_vins": [] } try: import asyncio loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _get_stats) except Exception as e: print(f"Error getting efficiency stats: {e}") return { "avg_revenue_per_transaction": 0.0, "avg_transactions_per_day": 0.0, "revenue_per_customer": 0.0, "transactions_per_customer": 0.0, "hourly_distribution": [], "top_vins": [] } # ============================================== # OPERATIONAL ANALYTICS METHODS # ============================================== async def get_ops_problem_vins_stats(self) -> dict: """Анализ проблемных VIN номеров""" def _get_stats(): try: with self._pool.acquire() as conn: with conn.cursor() as cur: # Сначала проверим, есть ли вообще данные в таблице check_query = "SELECT COUNT(*) FROM salvagebot.payment_logs WHERE payment_status = 'completed'" try: cur.execute(check_query) total_records = cur.fetchone()[0] except Exception as e: print(f"Error executing check query: {e}") print(f"Failed SQL: {check_query}") return { "no_data_vins": [], "error_vins": [], "no_photos_vins": [], "problem_vins_summary": [], "availability_stats": { "total_requested_vins": 0, "successful_vins": 0, "no_data_vins": 0, "error_vins": 0, "success_rate": 0.0 } } if total_records == 0: return { "no_data_vins": [], "error_vins": [], "no_photos_vins": [], "problem_vins_summary": [], "availability_stats": { "total_requested_vins": 0, "successful_vins": 0, "no_data_vins": 0, "error_vins": 0, "success_rate": 0.0 } } # VIN с множественными запросами но без данных no_data_vins_query = """ SELECT vin_number, COUNT(*) as request_count, COUNT(DISTINCT user_id) as unique_users, MAX(created_date) as last_request FROM salvagebot.payment_logs WHERE service_status = 'no_data' AND payment_status = 'completed' GROUP BY vin_number HAVING COUNT(*) >= 2 ORDER BY COUNT(*) DESC, MAX(created_date) DESC FETCH FIRST 20 ROWS ONLY """ try: cur.execute(no_data_vins_query) no_data_vins = cur.fetchall() except Exception as e: print(f"Error executing no_data_vins query: {e}") print(f"Failed SQL: {no_data_vins_query}") no_data_vins = [] # VIN вызывающие системные ошибки error_vins_query = """ SELECT vin_number, COUNT(*) as error_count, COUNT(DISTINCT user_id) as affected_users, 'Error occurred' as error_sample FROM salvagebot.payment_logs WHERE service_status = 'error' AND payment_status = 'completed' GROUP BY vin_number ORDER BY COUNT(*) DESC FETCH FIRST 15 ROWS ONLY """ try: cur.execute(error_vins_query) error_vins = cur.fetchall() except Exception as e: print(f"Error executing error_vins query: {e}") print(f"Failed SQL: {error_vins_query}") error_vins = [] # VIN без фотографий но с запросами get_photos no_photos_vins_query = """ SELECT pl.vin_number, COUNT(*) as photo_requests, COUNT(DISTINCT pl.user_id) as unique_users FROM salvagebot.payment_logs pl LEFT JOIN salvagedb.salvage_images si ON pl.vin_number = si.vin AND si.fn = 1 WHERE pl.service_type = 'get_photos' AND pl.payment_status = 'completed' AND si.vin IS NULL GROUP BY pl.vin_number ORDER BY COUNT(*) DESC FETCH FIRST 15 ROWS ONLY """ try: cur.execute(no_photos_vins_query) no_photos_vins = cur.fetchall() except Exception as e: print(f"Error executing no_photos_vins query: {e}") print(f"Failed SQL: {no_photos_vins_query}") no_photos_vins = [] # Общая статистика недоступности availability_query = """ SELECT COUNT(DISTINCT vin_number) as total_requested_vins, COUNT(DISTINCT CASE WHEN service_status = 'success' THEN vin_number END) as successful_vins, COUNT(DISTINCT CASE WHEN service_status = 'no_data' THEN vin_number END) as no_data_vins, COUNT(DISTINCT CASE WHEN service_status = 'error' THEN vin_number END) as error_vins FROM salvagebot.payment_logs WHERE payment_status = 'completed' """ try: cur.execute(availability_query) availability = cur.fetchone() except Exception as e: print(f"Error executing availability query: {e}") print(f"Failed SQL: {availability_query}") availability = None # Самые проблемные VIN (сводная статистика) problem_vins_query = """ SELECT vin_number, COUNT(*) as total_requests, COUNT(CASE WHEN service_status = 'success' THEN 1 END) as success_count, COUNT(CASE WHEN service_status = 'no_data' THEN 1 END) as no_data, COUNT(CASE WHEN service_status = 'error' THEN 1 END) as error_count, COUNT(CASE WHEN refund_status != 'no_refund' THEN 1 END) as refunds FROM salvagebot.payment_logs WHERE payment_status = 'completed' GROUP BY vin_number HAVING COUNT(CASE WHEN service_status IN ('no_data', 'error') THEN 1 END) >= 2 ORDER BY COUNT(CASE WHEN service_status IN ('no_data', 'error') THEN 1 END) DESC, COUNT(*) DESC FETCH FIRST 20 ROWS ONLY """ try: cur.execute(problem_vins_query) problem_vins = cur.fetchall() except Exception as e: print(f"Error executing problem_vins query: {e}") print(f"Failed SQL: {problem_vins_query}") problem_vins = [] # Обработка результатов с проверками if availability and len(availability) >= 4: total_vins = availability[0] or 1 successful_vins = availability[1] or 0 no_data_vins_count = availability[2] or 0 error_vins_count = availability[3] or 0 success_rate = round(successful_vins / total_vins * 100, 2) if total_vins > 0 else 0.0 else: total_vins = 1 successful_vins = 0 no_data_vins_count = 0 error_vins_count = 0 success_rate = 0.0 return { "no_data_vins": no_data_vins if no_data_vins else [], "error_vins": error_vins if error_vins else [], "no_photos_vins": no_photos_vins if no_photos_vins else [], "problem_vins_summary": problem_vins if problem_vins else [], "availability_stats": { "total_requested_vins": total_vins, "successful_vins": successful_vins, "no_data_vins": no_data_vins_count, "error_vins": error_vins_count, "success_rate": success_rate } } except Exception as inner_e: print(f"Inner error in problem VINs stats: {inner_e}") return { "no_data_vins": [], "error_vins": [], "no_photos_vins": [], "problem_vins_summary": [], "availability_stats": { "total_requested_vins": 0, "successful_vins": 0, "no_data_vins": 0, "error_vins": 0, "success_rate": 0.0 } } try: import asyncio loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _get_stats) except Exception as e: print(f"Error getting problem VINs stats: {e}") return { "no_data_vins": [], "error_vins": [], "no_photos_vins": [], "problem_vins_summary": [], "availability_stats": { "total_requested_vins": 0, "successful_vins": 0, "no_data_vins": 0, "error_vins": 0, "success_rate": 0.0 } } async def get_ops_performance_stats(self) -> dict: """Анализ производительности системы""" def _get_stats(): try: with self._pool.acquire() as conn: with conn.cursor() as cur: # Общая статистика производительности performance_query = """ SELECT COUNT(*) as total_requests, COUNT(CASE WHEN service_status = 'success' THEN 1 END) as successful_requests, COUNT(CASE WHEN service_status = 'no_data' THEN 1 END) as no_data_requests, COUNT(CASE WHEN service_status = 'error' THEN 1 END) as error_requests, ROUND(AVG(CASE WHEN service_status = 'success' AND data_found_count IS NOT NULL THEN data_found_count ELSE 0 END), 2) as avg_data_found FROM salvagebot.payment_logs WHERE payment_status = 'completed' AND created_date >= SYSDATE - 30 """ try: cur.execute(performance_query) perf_result = cur.fetchone() except Exception as e: print(f"Error executing performance query: {e}") print(f"Failed SQL: {performance_query}") perf_result = None # Статистика по услугам services_query = """ SELECT service_type, COUNT(*) as requests, COUNT(CASE WHEN service_status = 'success' THEN 1 END) as success_count, ROUND(AVG(CASE WHEN service_status = 'success' AND data_found_count IS NOT NULL THEN data_found_count ELSE 0 END), 2) as avg_data, COUNT(CASE WHEN refund_status <> 'no_refund' THEN 1 END) as refunds FROM salvagebot.payment_logs WHERE payment_status = 'completed' AND created_date >= SYSDATE - 30 GROUP BY service_type """ try: cur.execute(services_query) services_breakdown = cur.fetchall() except Exception as e: print(f"Error executing services query: {e}") print(f"Failed SQL: {services_query}") services_breakdown = [] # Статистика фотографий photos_query = """ SELECT COUNT(DISTINCT si.vin) as vins_with_photos, COUNT(*) as total_photos, ROUND(COUNT(*) / NULLIF(COUNT(DISTINCT si.vin), 0), 1) as avg_photos_per_vin FROM salvagedb.salvage_images si WHERE si.fn = 1 """ try: cur.execute(photos_query) photos_result = cur.fetchone() except Exception as e: print(f"Error executing photos query: {e}") print(f"Failed SQL: {photos_query}") photos_result = None # Пиковые часы peak_hours_query = """ SELECT TO_NUMBER(TO_CHAR(created_date, 'HH24')) as hour, COUNT(*) as requests FROM salvagebot.payment_logs WHERE payment_status = 'completed' AND created_date >= SYSDATE - 7 GROUP BY TO_NUMBER(TO_CHAR(created_date, 'HH24')) ORDER BY COUNT(*) DESC """ try: cur.execute(peak_hours_query) peak_hours = cur.fetchall() except Exception as e: print(f"Error executing peak_hours query: {e}") print(f"Failed SQL: {peak_hours_query}") peak_hours = [] # Обработка результатов if perf_result: total_requests = perf_result[0] or 0 successful_requests = perf_result[1] or 0 no_data_requests = perf_result[2] or 0 error_requests = perf_result[3] or 0 avg_data_found = perf_result[4] or 0.0 success_rate = round(successful_requests / total_requests * 100, 1) if total_requests > 0 else 0.0 else: total_requests = successful_requests = no_data_requests = error_requests = 0 avg_data_found = success_rate = 0.0 photos_stats = { 'vins_with_photos': photos_result[0] if photos_result and photos_result[0] else 0, 'total_photos': photos_result[1] if photos_result and photos_result[1] else 0, 'avg_photos_per_vin': photos_result[2] if photos_result and photos_result[2] else 0.0 } return { 'total_requests': total_requests, 'successful_requests': successful_requests, 'no_data_requests': no_data_requests, 'error_requests': error_requests, 'success_rate': success_rate, 'avg_data_found': avg_data_found, 'services_breakdown': services_breakdown or [], 'photos_stats': photos_stats, 'peak_hours': peak_hours or [] } except Exception as e: print(f"Error in performance stats: {e}") return { 'total_requests': 0, 'successful_requests': 0, 'no_data_requests': 0, 'error_requests': 0, 'success_rate': 0.0, 'avg_data_found': 0.0, 'services_breakdown': [], 'photos_stats': {'vins_with_photos': 0, 'total_photos': 0, 'avg_photos_per_vin': 0.0}, 'peak_hours': [] } import asyncio return await asyncio.to_thread(_get_stats) async def get_ops_errors_stats(self) -> dict: """Анализ ошибок системы""" def _get_stats(): try: with self._pool.acquire() as conn: with conn.cursor() as cur: # Общая статистика ошибок errors_query = """ SELECT COUNT(*) as total_attempts, COUNT(CASE WHEN payment_status = 'failed' THEN 1 END) as payment_failures, COUNT(CASE WHEN service_status = 'error' THEN 1 END) as service_errors, COUNT(CASE WHEN service_status = 'no_data' THEN 1 END) as no_data_cases, COUNT(CASE WHEN refund_status <> 'no_refund' THEN 1 END) as auto_refunds FROM salvagebot.payment_logs WHERE created_date >= SYSDATE - 30 """ try: cur.execute(errors_query) errors_result = cur.fetchone() except Exception as e: print(f"Error executing errors query: {e}") print(f"Failed SQL: {errors_query}") errors_result = None # Ошибки по услугам service_errors_query = """ SELECT service_type, COUNT(*) as total_requests, COUNT(CASE WHEN service_status = 'error' THEN 1 END) as errors, COUNT(CASE WHEN service_status = 'no_data' THEN 1 END) as no_data, COUNT(CASE WHEN refund_status <> 'no_refund' THEN 1 END) as auto_refunds FROM salvagebot.payment_logs WHERE payment_status = 'completed' AND created_date >= SYSDATE - 30 GROUP BY service_type ORDER BY COUNT(CASE WHEN service_status = 'error' THEN 1 END) DESC """ try: cur.execute(service_errors_query) service_errors = cur.fetchall() except Exception as e: print(f"Error executing service_errors query: {e}") print(f"Failed SQL: {service_errors_query}") service_errors = [] # Частые ошибки (имитируем, так как реальных логов ошибок может не быть) common_errors_query = """ SELECT 'Database connection timeout' as error_snippet, 5 as count FROM dual UNION ALL SELECT 'VIN validation failed', 3 FROM dual ORDER BY count DESC """ try: cur.execute(common_errors_query) common_errors = cur.fetchall() except Exception as e: print(f"Error executing common_errors query: {e}") print(f"Failed SQL: {common_errors_query}") common_errors = [] # Тренд ошибок по дням daily_trend_query = """ SELECT TRUNC(created_date) as error_date, COUNT(CASE WHEN service_status = 'error' THEN 1 END) as errors_count FROM salvagebot.payment_logs WHERE created_date >= SYSDATE - 7 GROUP BY TRUNC(created_date) ORDER BY TRUNC(created_date) """ try: cur.execute(daily_trend_query) daily_trend = cur.fetchall() except Exception as e: print(f"Error executing daily_trend query: {e}") print(f"Failed SQL: {daily_trend_query}") daily_trend = [] # Обработка результатов if errors_result: total_attempts = errors_result[0] or 0 payment_failures = errors_result[1] or 0 service_errors_count = errors_result[2] or 0 no_data_cases = errors_result[3] or 0 auto_refunds = errors_result[4] or 0 error_rate = round((payment_failures + service_errors_count) / total_attempts * 100, 2) if total_attempts > 0 else 0.0 else: total_attempts = payment_failures = service_errors_count = no_data_cases = auto_refunds = 0 error_rate = 0.0 return { 'total_attempts': total_attempts, 'payment_failures': payment_failures, 'service_errors': service_errors_count, 'no_data_cases': no_data_cases, 'auto_refunds': auto_refunds, 'error_rate': error_rate, 'service_errors_breakdown': service_errors or [], 'common_errors': common_errors or [], 'daily_error_trend': daily_trend or [] } except Exception as e: print(f"Error in errors stats: {e}") return { 'total_attempts': 0, 'payment_failures': 0, 'service_errors': 0, 'no_data_cases': 0, 'auto_refunds': 0, 'error_rate': 0.0, 'service_errors_breakdown': [], 'common_errors': [], 'daily_error_trend': [] } import asyncio return await asyncio.to_thread(_get_stats) async def get_ops_load_stats(self) -> dict: """Мониторинг нагрузки системы""" def _get_stats(): try: with self._pool.acquire() as conn: with conn.cursor() as cur: # Средние показатели (упрощенный запрос) avg_stats_query = """ SELECT ROUND(COUNT(*) / 30, 1) as avg_daily_requests, ROUND(COUNT(DISTINCT user_id) / 30, 1) as avg_daily_users FROM salvagebot.payment_logs WHERE payment_status = 'completed' AND created_date >= SYSDATE - 30 """ try: cur.execute(avg_stats_query) avg_result = cur.fetchone() except Exception as e: print(f"Error executing avg_stats query: {e}") print(f"Failed SQL: {avg_stats_query}") avg_result = None # Отдельный запрос для пикового часа peak_hour_query = """ SELECT TO_NUMBER(TO_CHAR(created_date, 'HH24')) as hour FROM ( SELECT created_date, ROW_NUMBER() OVER (ORDER BY cnt DESC) as rn FROM ( SELECT created_date, COUNT(*) as cnt FROM salvagebot.payment_logs WHERE created_date >= SYSDATE - 7 AND payment_status = 'completed' GROUP BY TO_NUMBER(TO_CHAR(created_date, 'HH24')), created_date ) ) WHERE rn = 1 AND ROWNUM = 1 """ try: cur.execute(peak_hour_query) peak_hour_result = cur.fetchone() peak_hour = peak_hour_result[0] if peak_hour_result else 0 except Exception as e: print(f"Error executing peak_hour query: {e}") print(f"Failed SQL: {peak_hour_query}") peak_hour = 0 # Распределение по часам hourly_query = """ SELECT TO_NUMBER(TO_CHAR(created_date, 'HH24')) as hour, COUNT(*) as requests, COUNT(DISTINCT user_id) as unique_users, ROUND(AVG(CASE WHEN data_found_count IS NOT NULL THEN data_found_count ELSE 0 END), 1) as avg_data FROM salvagebot.payment_logs WHERE payment_status = 'completed' AND created_date >= SYSDATE - 7 GROUP BY TO_NUMBER(TO_CHAR(created_date, 'HH24')) ORDER BY COUNT(*) DESC """ try: cur.execute(hourly_query) hourly_distribution = cur.fetchall() except Exception as e: print(f"Error executing hourly query: {e}") print(f"Failed SQL: {hourly_query}") hourly_distribution = [] # Популярные VIN popular_vins_query = """ SELECT vin_number, COUNT(*) as request_count, COUNT(DISTINCT user_id) as unique_users, COUNT(CASE WHEN service_status = 'success' THEN 1 END) as successful_count FROM salvagebot.payment_logs WHERE payment_status = 'completed' AND created_date >= SYSDATE - 30 GROUP BY vin_number HAVING COUNT(*) >= 2 ORDER BY COUNT(*) DESC FETCH FIRST 10 ROWS ONLY """ try: cur.execute(popular_vins_query) popular_vins = cur.fetchall() except Exception as e: print(f"Error executing popular_vins query: {e}") print(f"Failed SQL: {popular_vins_query}") popular_vins = [] # Concurrent пользователи concurrent_query = """ SELECT TRUNC(created_date, 'HH') as hour_block, COUNT(DISTINCT user_id) as concurrent_users, COUNT(*) as total_requests FROM salvagebot.payment_logs WHERE created_date >= SYSDATE - 3 GROUP BY TRUNC(created_date, 'HH') ORDER BY COUNT(DISTINCT user_id) DESC FETCH FIRST 6 ROWS ONLY """ try: cur.execute(concurrent_query) concurrent_users = cur.fetchall() except Exception as e: print(f"Error executing concurrent query: {e}") print(f"Failed SQL: {concurrent_query}") concurrent_users = [] # Обработка результатов if avg_result: avg_daily_requests = avg_result[0] or 0.0 avg_daily_users = avg_result[1] or 0.0 else: avg_daily_requests = avg_daily_users = 0.0 # Найти максимальное количество concurrent пользователей peak_concurrent = 0 if concurrent_users: peak_concurrent = max(row[1] for row in concurrent_users) if concurrent_users else 0 return { 'avg_daily_requests': avg_daily_requests, 'avg_daily_users': avg_daily_users, 'peak_hour': peak_hour, 'peak_concurrent': peak_concurrent, 'hourly_distribution': hourly_distribution or [], 'popular_vins': popular_vins or [], 'concurrent_users': concurrent_users or [] } except Exception as e: print(f"Error in load stats: {e}") import traceback print(f"Full traceback: {traceback.format_exc()}") return { 'avg_daily_requests': 0.0, 'avg_daily_users': 0.0, 'peak_hour': 0.0, 'peak_concurrent': 0, 'hourly_distribution': [], 'popular_vins': [], 'concurrent_users': [] } import asyncio return await asyncio.to_thread(_get_stats) async def get_ops_auctions_stats(self) -> dict: """Статистика по аукционам""" def _get_stats(): try: with self._pool.acquire() as conn: with conn.cursor() as cur: # Статистика по аукционам (если есть соответствующие поля) auctions_query = """ SELECT 'COPART' as auction_house, COUNT(DISTINCT vin) as unique_vins, COUNT(*) as total_records, ROUND(AVG(NVL(odo, 0)), 0) as avg_mileage FROM salvagedb.salvagedb WHERE dem1 LIKE '%COPART%' OR dem2 LIKE '%COPART%' UNION ALL SELECT 'IAAI', COUNT(DISTINCT vin), COUNT(*), ROUND(AVG(NVL(odo, 0)), 0) FROM salvagedb.salvagedb WHERE dem1 LIKE '%IAAI%' OR dem2 LIKE '%IAAI%' UNION ALL SELECT 'OTHER', COUNT(DISTINCT vin), COUNT(*), ROUND(AVG(NVL(odo, 0)), 0) FROM salvagedb.salvagedb WHERE (dem1 NOT LIKE '%COPART%' AND dem1 NOT LIKE '%IAAI%') AND (dem2 NOT LIKE '%COPART%' AND dem2 NOT LIKE '%IAAI%') """ try: cur.execute(auctions_query) auctions_breakdown = cur.fetchall() except Exception as e: print(f"Error executing auctions query: {e}") print(f"Failed SQL: {auctions_query}") auctions_breakdown = [] # Топ локаций locations_query = """ SELECT JSON_VALUE(jdata, '$.Locate') as location, COUNT(*) as count FROM salvagedb.salvagedb WHERE JSON_VALUE(jdata, '$.Locate') IS NOT NULL GROUP BY JSON_VALUE(jdata, '$.Locate') ORDER BY COUNT(*) DESC FETCH FIRST 10 ROWS ONLY """ try: cur.execute(locations_query) top_locations = cur.fetchall() except Exception as e: print(f"Error executing locations query: {e}") print(f"Failed SQL: {locations_query}") top_locations = [] # Статистика по повреждениям damage_query = """ SELECT dem1 as damage_type, COUNT(*) as count FROM salvagedb.salvagedb WHERE dem1 IS NOT NULL GROUP BY dem1 ORDER BY COUNT(*) DESC FETCH FIRST 10 ROWS ONLY """ try: cur.execute(damage_query) damage_types = cur.fetchall() except Exception as e: print(f"Error executing damage query: {e}") print(f"Failed SQL: {damage_query}") damage_types = [] # Тренд по годам yearly_trend_query = """ SELECT year, COUNT(*) as records FROM salvagedb.salvagedb WHERE year IS NOT NULL AND year >= EXTRACT(YEAR FROM SYSDATE) - 5 GROUP BY year ORDER BY year DESC """ try: cur.execute(yearly_trend_query) yearly_trend = cur.fetchall() except Exception as e: print(f"Error executing yearly_trend query: {e}") print(f"Failed SQL: {yearly_trend_query}") yearly_trend = [] return { 'auctions_breakdown': auctions_breakdown or [], 'top_locations': top_locations or [], 'damage_types': damage_types or [], 'yearly_trend': yearly_trend or [] } except Exception as e: print(f"Error in auctions stats: {e}") return { 'auctions_breakdown': [], 'top_locations': [], 'damage_types': [], 'yearly_trend': [] } import asyncio return await asyncio.to_thread(_get_stats) async def get_ops_monitoring_stats(self) -> dict: """Общий мониторинг системы""" def _get_stats(): try: with self._pool.acquire() as conn: with conn.cursor() as cur: # Системное здоровье health_query = """ SELECT COUNT(*) as total_db_records, COUNT(DISTINCT vin) as unique_vins, (SELECT COUNT(*) FROM salvagedb.salvage_images WHERE fn = 1) as total_photos, (SELECT COUNT(*) FROM salvagebot.payment_logs WHERE payment_status = 'completed' AND created_date >= SYSDATE - 1) as requests_24h FROM salvagedb.salvagedb """ try: cur.execute(health_query) health_result = cur.fetchone() except Exception as e: print(f"Error executing health query: {e}") print(f"Failed SQL: {health_query}") health_result = None # Статистика ответов response_query = """ SELECT 'Excellent (>95%)' as response_category, COUNT(CASE WHEN service_status = 'success' THEN 1 END) as count FROM salvagebot.payment_logs WHERE payment_status = 'completed' AND created_date >= SYSDATE - 7 """ try: cur.execute(response_query) response_times = cur.fetchall() except Exception as e: print(f"Error executing response query: {e}") print(f"Failed SQL: {response_query}") response_times = [] # Активность пользователей user_activity_query = """ SELECT COUNT(DISTINCT user_id) as active_users_7d, COUNT(DISTINCT CASE WHEN created_date >= SYSDATE - 1 THEN user_id END) as active_users_24h, COUNT(DISTINCT CASE WHEN created_date >= SYSDATE - 1/24 THEN user_id END) as active_users_1h FROM salvagebot.payment_logs WHERE created_date >= SYSDATE - 7 """ try: cur.execute(user_activity_query) activity_result = cur.fetchone() except Exception as e: print(f"Error executing user_activity query: {e}") print(f"Failed SQL: {user_activity_query}") activity_result = None # Использование хранилища storage_query = """ SELECT COUNT(*) * 0.5 as estimated_storage_mb, COUNT(DISTINCT SUBSTR(ipath, 1, INSTR(ipath, '/', -1))) as storage_directories FROM salvagedb.salvage_images WHERE fn = 1 """ try: cur.execute(storage_query) storage_result = cur.fetchone() except Exception as e: print(f"Error executing storage query: {e}") print(f"Failed SQL: {storage_query}") storage_result = None # Статус системы last_requests_query = """ SELECT MAX(created_date) as last_request, COUNT(CASE WHEN created_date >= SYSDATE - 1/24 THEN 1 END) as requests_last_hour FROM salvagebot.payment_logs """ try: cur.execute(last_requests_query) status_result = cur.fetchone() except Exception as e: print(f"Error executing status query: {e}") print(f"Failed SQL: {last_requests_query}") status_result = None # Обработка результатов if health_result: total_db_records = health_result[0] or 0 unique_vins = health_result[1] or 0 total_photos = health_result[2] or 0 requests_24h = health_result[3] or 0 else: total_db_records = unique_vins = total_photos = requests_24h = 0 if activity_result: active_users_7d = activity_result[0] or 0 active_users_24h = activity_result[1] or 0 active_users_1h = activity_result[2] or 0 else: active_users_7d = active_users_24h = active_users_1h = 0 if storage_result: estimated_storage_mb = storage_result[0] or 0 storage_directories = storage_result[1] or 0 else: estimated_storage_mb = storage_directories = 0 system_status = "🟢 Healthy" if requests_24h > 0 else "🟡 Low Activity" return { 'system_health': { 'total_db_records': total_db_records, 'unique_vins': unique_vins, 'total_photos': total_photos, 'requests_24h': requests_24h }, 'response_times': response_times or [], 'user_activity': { 'active_users_7d': active_users_7d, 'active_users_24h': active_users_24h, 'active_users_1h': active_users_1h }, 'storage_info': { 'estimated_storage_mb': estimated_storage_mb, 'storage_directories': storage_directories }, 'system_status': system_status, 'last_request': status_result[0] if status_result else None, 'requests_last_hour': status_result[1] if status_result else 0 } except Exception as e: print(f"Error in monitoring stats: {e}") return { 'system_health': { 'total_db_records': 0, 'unique_vins': 0, 'total_photos': 0, 'requests_24h': 0 }, 'response_times': [], 'user_activity': { 'active_users_7d': 0, 'active_users_24h': 0, 'active_users_1h': 0 }, 'storage_info': { 'estimated_storage_mb': 0, 'storage_directories': 0 }, 'system_status': "🔴 Unknown", 'last_request': None, 'requests_last_hour': 0 } import asyncio return await asyncio.to_thread(_get_stats) # ============================================== # BUSINESS ANALYTICS METHODS # ============================================== async def get_biz_trends_stats(self) -> dict: """Анализ трендов бизнеса""" def _get_stats(): try: with self._pool.acquire() as conn: with conn.cursor() as cur: # Тренд роста пользователей user_growth_query = """ SELECT TO_CHAR(created_at, 'YYYY-MM') as month, COUNT(*) as new_users, COUNT(CASE WHEN is_premium = 1 THEN 1 END) as premium_users FROM bot_users WHERE created_at >= ADD_MONTHS(SYSDATE, -12) GROUP BY TO_CHAR(created_at, 'YYYY-MM') ORDER BY TO_CHAR(created_at, 'YYYY-MM') """ try: cur.execute(user_growth_query) user_growth = cur.fetchall() except Exception as e: print(f"Error executing user_growth query: {e}") print(f"Failed SQL: {user_growth_query}") user_growth = [] # Тренд выручки revenue_trend_query = """ SELECT TO_CHAR(created_date, 'YYYY-MM') as month, COUNT(*) as transactions, SUM(payment_amount) as revenue, COUNT(DISTINCT user_id) as paying_users FROM salvagebot.payment_logs WHERE payment_status = 'completed' AND created_date >= ADD_MONTHS(SYSDATE, -12) GROUP BY TO_CHAR(created_date, 'YYYY-MM') ORDER BY TO_CHAR(created_date, 'YYYY-MM') """ try: cur.execute(revenue_trend_query) revenue_trend = cur.fetchall() except Exception as e: print(f"Error executing revenue_trend query: {e}") print(f"Failed SQL: {revenue_trend_query}") revenue_trend = [] # Тренд использования услуг services_trend_query = """ SELECT service_type, TO_CHAR(created_date, 'YYYY-MM') as month, COUNT(*) as usage_count FROM salvagebot.payment_logs WHERE payment_status = 'completed' AND created_date >= ADD_MONTHS(SYSDATE, -6) GROUP BY service_type, TO_CHAR(created_date, 'YYYY-MM') ORDER BY TO_CHAR(created_date, 'YYYY-MM'), service_type """ try: cur.execute(services_trend_query) services_trend = cur.fetchall() except Exception as e: print(f"Error executing services_trend query: {e}") print(f"Failed SQL: {services_trend_query}") services_trend = [] # Тренд конверсии conversion_trend_query = """ SELECT TO_CHAR(bu.created_at, 'YYYY-MM') as month, COUNT(DISTINCT bu.id) as total_users, COUNT(DISTINCT pl.user_id) as converted_users FROM bot_users bu LEFT JOIN salvagebot.payment_logs pl ON bu.id = pl.user_id AND pl.payment_status = 'completed' WHERE bu.created_at >= ADD_MONTHS(SYSDATE, -12) GROUP BY TO_CHAR(bu.created_at, 'YYYY-MM') ORDER BY TO_CHAR(bu.created_at, 'YYYY-MM') """ try: cur.execute(conversion_trend_query) conversion_trend = cur.fetchall() except Exception as e: print(f"Error executing conversion_trend query: {e}") print(f"Failed SQL: {conversion_trend_query}") conversion_trend = [] return { 'user_growth_trend': user_growth or [], 'revenue_trend': revenue_trend or [], 'services_trend': services_trend or [], 'conversion_trend': conversion_trend or [] } except Exception as e: print(f"Error in trends stats: {e}") import traceback print(f"Full traceback: {traceback.format_exc()}") return { 'user_growth_trend': [], 'revenue_trend': [], 'services_trend': [], 'conversion_trend': [] } import asyncio return await asyncio.to_thread(_get_stats) async def get_biz_forecasts_stats(self) -> dict: """Прогнозы развития бизнеса""" def _get_stats(): try: with self._pool.acquire() as conn: with conn.cursor() as cur: # Прогноз на основе последних 3 месяцев recent_growth_query = """ SELECT TO_CHAR(created_at, 'YYYY-MM') as month, COUNT(*) as new_users, SUM(total_payments) as revenue FROM bot_users WHERE created_at >= ADD_MONTHS(SYSDATE, -3) GROUP BY TO_CHAR(created_at, 'YYYY-MM') ORDER BY TO_CHAR(created_at, 'YYYY-MM') DESC """ try: cur.execute(recent_growth_query) recent_growth = cur.fetchall() except Exception as e: print(f"Error executing recent_growth query: {e}") print(f"Failed SQL: {recent_growth_query}") recent_growth = [] # Сезонность по дням недели seasonality_query = """ SELECT TO_CHAR(created_date, 'D') as day_of_week, COUNT(*) as transactions, AVG(payment_amount) as avg_amount FROM salvagebot.payment_logs WHERE payment_status = 'completed' AND created_date >= SYSDATE - 30 GROUP BY TO_CHAR(created_date, 'D') ORDER BY TO_CHAR(created_date, 'D') """ try: cur.execute(seasonality_query) seasonality = cur.fetchall() except Exception as e: print(f"Error executing seasonality query: {e}") print(f"Failed SQL: {seasonality_query}") seasonality = [] # Потенциал роста по регионам regional_potential_query = """ SELECT language_code, COUNT(*) as user_count, COUNT(CASE WHEN total_payments > 0 THEN 1 END) as paying_users, AVG(total_payments) as avg_revenue FROM bot_users WHERE is_active = 1 GROUP BY language_code ORDER BY COUNT(*) DESC """ try: cur.execute(regional_potential_query) regional_potential = cur.fetchall() except Exception as e: print(f"Error executing regional_potential query: {e}") print(f"Failed SQL: {regional_potential_query}") regional_potential = [] return { 'recent_growth': recent_growth or [], 'seasonality': seasonality or [], 'regional_potential': regional_potential or [] } except Exception as e: print(f"Error in forecasts stats: {e}") import traceback print(f"Full traceback: {traceback.format_exc()}") return { 'recent_growth': [], 'seasonality': [], 'regional_potential': [] } import asyncio return await asyncio.to_thread(_get_stats) async def get_biz_regions_stats(self) -> dict: """Анализ регионов роста""" def _get_stats(): try: with self._pool.acquire() as conn: with conn.cursor() as cur: # Топ регионы по росту regions_growth_query = """ SELECT language_code, COUNT(CASE WHEN created_at >= SYSDATE - 30 THEN 1 END) as new_month, COUNT(CASE WHEN created_at >= SYSDATE - 60 AND created_at < SYSDATE - 30 THEN 1 END) as prev_month, COUNT(*) as total_users, AVG(total_payments) as avg_revenue, COUNT(CASE WHEN total_payments > 0 THEN 1 END) as paying_users FROM bot_users GROUP BY language_code HAVING COUNT(*) >= 5 ORDER BY COUNT(CASE WHEN created_at >= SYSDATE - 30 THEN 1 END) DESC """ try: cur.execute(regions_growth_query) regions_growth = cur.fetchall() except Exception as e: print(f"Error executing regions_growth query: {e}") print(f"Failed SQL: {regions_growth_query}") regions_growth = [] # Конверсия по регионам regional_conversion_query = """ SELECT bu.language_code, COUNT(DISTINCT bu.id) as total_users, COUNT(DISTINCT pl.user_id) as paying_users, COUNT(pl.log_id) as total_transactions, SUM(pl.payment_amount) as total_revenue FROM bot_users bu LEFT JOIN salvagebot.payment_logs pl ON bu.id = pl.user_id AND pl.payment_status = 'completed' WHERE bu.is_active = 1 GROUP BY bu.language_code HAVING COUNT(DISTINCT bu.id) >= 3 ORDER BY COUNT(DISTINCT pl.user_id) DESC """ try: cur.execute(regional_conversion_query) regional_conversion = cur.fetchall() except Exception as e: print(f"Error executing regional_conversion query: {e}") print(f"Failed SQL: {regional_conversion_query}") regional_conversion = [] # Premium распределение по регионам premium_distribution_query = """ SELECT language_code, COUNT(*) as total_users, COUNT(CASE WHEN is_premium = 1 THEN 1 END) as premium_users, AVG(CASE WHEN is_premium = 1 THEN total_payments END) as premium_avg_revenue, AVG(CASE WHEN is_premium = 0 THEN total_payments END) as regular_avg_revenue FROM bot_users WHERE is_active = 1 GROUP BY language_code HAVING COUNT(*) >= 5 ORDER BY COUNT(CASE WHEN is_premium = 1 THEN 1 END) DESC """ try: cur.execute(premium_distribution_query) premium_distribution = cur.fetchall() except Exception as e: print(f"Error executing premium_distribution query: {e}") print(f"Failed SQL: {premium_distribution_query}") premium_distribution = [] return { 'regions_growth': regions_growth or [], 'regional_conversion': regional_conversion or [], 'premium_distribution': premium_distribution or [] } except Exception as e: print(f"Error in regions stats: {e}") import traceback print(f"Full traceback: {traceback.format_exc()}") return { 'regions_growth': [], 'regional_conversion': [], 'premium_distribution': [] } import asyncio return await asyncio.to_thread(_get_stats) async def get_biz_monetization_stats(self) -> dict: """Анализ монетизации""" def _get_stats(): try: with self._pool.acquire() as conn: with conn.cursor() as cur: # Воронка монетизации monetization_funnel_query = """ SELECT COUNT(*) as total_users, COUNT(CASE WHEN interaction_count > 1 THEN 1 END) as engaged_users, COUNT(CASE WHEN successful_payments_count > 0 THEN 1 END) as paying_users, COUNT(CASE WHEN successful_payments_count > 1 THEN 1 END) as repeat_buyers, COUNT(CASE WHEN total_payments > 50 THEN 1 END) as high_value_users FROM bot_users WHERE is_active = 1 """ try: cur.execute(monetization_funnel_query) funnel_result = cur.fetchone() except Exception as e: print(f"Error executing monetization_funnel query: {e}") print(f"Failed SQL: {monetization_funnel_query}") funnel_result = None # Анализ LTV (пожизненная ценность) ltv_analysis_query = """ SELECT CASE WHEN total_payments = 0 THEN 'No Payment' WHEN total_payments <= 10 THEN 'Low Value (≤10)' WHEN total_payments <= 50 THEN 'Medium Value (11-50)' WHEN total_payments <= 100 THEN 'High Value (51-100)' ELSE 'Premium Value (>100)' END as user_segment, COUNT(*) as user_count, AVG(total_payments) as avg_ltv, SUM(total_payments) as total_revenue, AVG(successful_payments_count) as avg_transactions FROM bot_users WHERE is_active = 1 GROUP BY CASE WHEN total_payments = 0 THEN 'No Payment' WHEN total_payments <= 10 THEN 'Low Value (≤10)' WHEN total_payments <= 50 THEN 'Medium Value (11-50)' WHEN total_payments <= 100 THEN 'High Value (51-100)' ELSE 'Premium Value (>100)' END ORDER BY AVG(total_payments) DESC """ try: cur.execute(ltv_analysis_query) ltv_analysis = cur.fetchall() except Exception as e: print(f"Error executing ltv_analysis query: {e}") print(f"Failed SQL: {ltv_analysis_query}") ltv_analysis = [] # Анализ прибыльности услуг service_profitability_query = """ SELECT service_type, COUNT(*) as transaction_count, SUM(payment_amount) as total_revenue, AVG(payment_amount) as avg_price, COUNT(DISTINCT user_id) as unique_users, COUNT(CASE WHEN service_status = 'success' THEN 1 END) as successful_transactions FROM salvagebot.payment_logs WHERE payment_status = 'completed' AND created_date >= SYSDATE - 90 GROUP BY service_type ORDER BY SUM(payment_amount) DESC """ try: cur.execute(service_profitability_query) service_profitability = cur.fetchall() except Exception as e: print(f"Error executing service_profitability query: {e}") print(f"Failed SQL: {service_profitability_query}") service_profitability = [] # Время до первой покупки time_to_purchase_query = """ SELECT ROUND(AVG(pl.created_date - bu.created_at), 1) as avg_days_to_purchase, COUNT(*) as first_purchases FROM bot_users bu INNER JOIN ( SELECT user_id, MIN(created_date) as first_purchase_date FROM salvagebot.payment_logs WHERE payment_status = 'completed' GROUP BY user_id ) first_pl ON bu.id = first_pl.user_id INNER JOIN salvagebot.payment_logs pl ON bu.id = pl.user_id AND pl.created_date = first_pl.first_purchase_date WHERE bu.created_at >= SYSDATE - 365 """ try: cur.execute(time_to_purchase_query) time_to_purchase = cur.fetchone() except Exception as e: print(f"Error executing time_to_purchase query: {e}") print(f"Failed SQL: {time_to_purchase_query}") time_to_purchase = None return { 'monetization_funnel': funnel_result, 'ltv_analysis': ltv_analysis or [], 'service_profitability': service_profitability or [], 'time_to_purchase': time_to_purchase } except Exception as e: print(f"Error in monetization stats: {e}") import traceback print(f"Full traceback: {traceback.format_exc()}") return { 'monetization_funnel': None, 'ltv_analysis': [], 'service_profitability': [], 'time_to_purchase': None } import asyncio return await asyncio.to_thread(_get_stats) async def get_biz_optimization_stats(self) -> dict: """Анализ возможностей оптимизации""" def _get_stats(): try: with self._pool.acquire() as conn: with conn.cursor() as cur: # Анализ оттока пользователей churn_analysis_query = """ SELECT user_status, COUNT(*) as user_count, AVG(total_payments) as avg_revenue, COUNT(CASE WHEN total_payments > 0 THEN 1 END) as paying_users FROM ( SELECT CASE WHEN last_interaction_date >= SYSDATE - 7 THEN 'Active (0-7 days)' WHEN last_interaction_date >= SYSDATE - 30 THEN 'Recent (8-30 days)' WHEN last_interaction_date >= SYSDATE - 90 THEN 'Dormant (31-90 days)' ELSE 'Churned (>90 days)' END as user_status, CASE WHEN last_interaction_date >= SYSDATE - 7 THEN 1 WHEN last_interaction_date >= SYSDATE - 30 THEN 2 WHEN last_interaction_date >= SYSDATE - 90 THEN 3 ELSE 4 END as sort_order, total_payments FROM bot_users WHERE is_active = 1 AND last_interaction_date IS NOT NULL ) t GROUP BY user_status, sort_order ORDER BY sort_order """ try: cur.execute(churn_analysis_query) churn_analysis = cur.fetchall() except Exception as e: print(f"Error executing churn_analysis query: {e}") print(f"Failed SQL: {churn_analysis_query}") churn_analysis = [] # Неэффективные запросы inefficient_requests_query = """ SELECT service_type, COUNT(*) as total_requests, COUNT(CASE WHEN service_status = 'no_data' THEN 1 END) as no_data_requests, COUNT(CASE WHEN service_status = 'error' THEN 1 END) as error_requests, COUNT(CASE WHEN refund_status <> 'no_refund' THEN 1 END) as refunded_requests FROM salvagebot.payment_logs WHERE payment_status = 'completed' AND created_date >= SYSDATE - 30 GROUP BY service_type ORDER BY service_type """ try: cur.execute(inefficient_requests_query) inefficient_requests = cur.fetchall() except Exception as e: print(f"Error executing inefficient_requests query: {e}") print(f"Failed SQL: {inefficient_requests_query}") inefficient_requests = [] # Пользователи с высоким потенциалом high_potential_query = """ SELECT COUNT(CASE WHEN interaction_count >= 5 AND successful_payments_count = 0 THEN 1 END) as engaged_non_buyers, COUNT(CASE WHEN successful_payments_count = 1 AND total_payments >= 5 THEN 1 END) as potential_repeat_buyers, COUNT(CASE WHEN is_premium = 1 AND total_payments < 20 THEN 1 END) as underperforming_premium, COUNT(CASE WHEN total_payments >= 10 AND last_interaction_date < SYSDATE - 30 THEN 1 END) as valuable_dormant FROM bot_users WHERE is_active = 1 """ try: cur.execute(high_potential_query) high_potential = cur.fetchone() except Exception as e: print(f"Error executing high_potential query: {e}") print(f"Failed SQL: {high_potential_query}") high_potential = None # Анализ ценообразования pricing_analysis_query = """ SELECT service_type, payment_amount as price, COUNT(*) as purchase_count, COUNT(CASE WHEN service_status = 'success' THEN 1 END) as successful_count, COUNT(CASE WHEN refund_status <> 'no_refund' THEN 1 END) as refund_count FROM salvagebot.payment_logs WHERE payment_status = 'completed' AND created_date >= SYSDATE - 60 GROUP BY service_type, payment_amount ORDER BY service_type, payment_amount """ try: cur.execute(pricing_analysis_query) pricing_analysis = cur.fetchall() except Exception as e: print(f"Error executing pricing_analysis query: {e}") print(f"Failed SQL: {pricing_analysis_query}") pricing_analysis = [] return { 'churn_analysis': churn_analysis or [], 'inefficient_requests': inefficient_requests or [], 'high_potential': high_potential, 'pricing_analysis': pricing_analysis or [] } except Exception as e: print(f"Error in optimization stats: {e}") import traceback print(f"Full traceback: {traceback.format_exc()}") return { 'churn_analysis': [], 'inefficient_requests': [], 'high_potential': None, 'pricing_analysis': [] } import asyncio return await asyncio.to_thread(_get_stats) async def get_biz_recommendations_stats(self) -> dict: """Бизнес рекомендации на основе данных""" def _get_stats(): try: with self._pool.acquire() as conn: with conn.cursor() as cur: # Базовые метрики для рекомендаций base_metrics_query = """ SELECT COUNT(*) as total_users, COUNT(CASE WHEN total_payments > 0 THEN 1 END) as paying_users, AVG(total_payments) as avg_ltv, COUNT(CASE WHEN last_interaction_date >= SYSDATE - 7 THEN 1 END) as active_users, COUNT(CASE WHEN is_premium = 1 THEN 1 END) as premium_users FROM bot_users WHERE is_active = 1 """ try: cur.execute(base_metrics_query) base_metrics = cur.fetchone() except Exception as e: print(f"Error executing base_metrics query: {e}") print(f"Failed SQL: {base_metrics_query}") base_metrics = None # Сервисная эффективность service_efficiency_query = """ SELECT service_type, COUNT(*) as total_requests, COUNT(CASE WHEN service_status = 'success' THEN 1 END) as successful_count, SUM(payment_amount) as revenue, COUNT(CASE WHEN refund_status != 'no_refund' THEN 1 END) as refunds FROM salvagebot.payment_logs WHERE payment_status = 'completed' AND created_date >= SYSDATE - 30 GROUP BY service_type ORDER BY service_type """ try: cur.execute(service_efficiency_query) service_efficiency = cur.fetchall() except Exception as e: print(f"Error executing service_efficiency query: {e}") print(f"Failed SQL: {service_efficiency_query}") service_efficiency = [] # Рост по регионам за последний месяц regional_growth_query = """ SELECT language_code, COUNT(CASE WHEN created_at >= SYSDATE - 30 THEN 1 END) as new_users_month, COUNT(*) as total_users, COUNT(CASE WHEN total_payments > 0 THEN 1 END) as paying_users FROM bot_users WHERE is_active = 1 GROUP BY language_code HAVING COUNT(*) >= 3 ORDER BY COUNT(CASE WHEN created_at >= SYSDATE - 30 THEN 1 END) DESC """ try: cur.execute(regional_growth_query) regional_growth = cur.fetchall() except Exception as e: print(f"Error executing regional_growth query: {e}") print(f"Failed SQL: {regional_growth_query}") regional_growth = [] return { 'base_metrics': base_metrics, 'service_efficiency': service_efficiency or [], 'regional_growth': regional_growth or [] } except Exception as e: print(f"Error in recommendations stats: {e}") import traceback print(f"Full traceback: {traceback.format_exc()}") return { 'base_metrics': None, 'service_efficiency': [], 'regional_growth': [] } import asyncio return await asyncio.to_thread(_get_stats)