215 lines
10 KiB
Python
215 lines
10 KiB
Python
"""
|
||
Управление пользователями и их данными
|
||
"""
|
||
import logging
|
||
from typing import Optional, Dict
|
||
from aiogram.types import User
|
||
|
||
from .base import OracleDatabase
|
||
|
||
|
||
class UserManager(OracleDatabase):
|
||
"""Класс для управления пользователями"""
|
||
|
||
async def save_user(self, user: User, interaction_source: str = "bot") -> bool:
|
||
"""
|
||
Сохраняет или обновляет данные пользователя в базе данных
|
||
При первом взаимодействии создает запись, при последующих - обновляет
|
||
"""
|
||
def _save_user():
|
||
try:
|
||
with self._get_connection() as conn:
|
||
with conn.cursor() as cur:
|
||
# Проверяем, существует ли пользователь
|
||
cur.execute("SELECT id FROM bot_users WHERE id = :user_id", {"user_id": user.id})
|
||
existing_user = cur.fetchone()
|
||
|
||
if existing_user:
|
||
# Обновляем существующего пользователя
|
||
update_query = """
|
||
UPDATE bot_users SET
|
||
first_name = :first_name,
|
||
last_name = :last_name,
|
||
username = :username,
|
||
language_code = :language_code,
|
||
is_premium = :is_premium,
|
||
added_to_attachment_menu = :added_to_attachment_menu,
|
||
last_interaction_date = SYSDATE,
|
||
interaction_count = interaction_count + 1
|
||
WHERE id = :user_id
|
||
"""
|
||
|
||
params = {
|
||
"user_id": user.id,
|
||
"first_name": user.first_name,
|
||
"last_name": user.last_name,
|
||
"username": user.username,
|
||
"language_code": user.language_code,
|
||
"is_premium": 1 if user.is_premium else 0,
|
||
"added_to_attachment_menu": 1 if user.added_to_attachment_menu else 0
|
||
}
|
||
cur.execute(update_query, params)
|
||
else:
|
||
# Создаем нового пользователя
|
||
insert_query = """
|
||
INSERT INTO bot_users (
|
||
id, first_name, last_name, username, language_code,
|
||
is_bot, is_premium, added_to_attachment_menu,
|
||
registration_source, first_interaction_date,
|
||
last_interaction_date, interaction_count
|
||
) VALUES (
|
||
:user_id, :first_name, :last_name, :username, :language_code,
|
||
:is_bot, :is_premium, :added_to_attachment_menu,
|
||
:registration_source, SYSDATE, SYSDATE, 1
|
||
)
|
||
"""
|
||
|
||
params = {
|
||
"user_id": user.id,
|
||
"first_name": user.first_name,
|
||
"last_name": user.last_name,
|
||
"username": user.username,
|
||
"language_code": user.language_code,
|
||
"is_bot": 1 if user.is_bot else 0,
|
||
"is_premium": 1 if user.is_premium else 0,
|
||
"added_to_attachment_menu": 1 if user.added_to_attachment_menu else 0,
|
||
"registration_source": interaction_source
|
||
}
|
||
cur.execute(insert_query, params)
|
||
|
||
conn.commit()
|
||
return True
|
||
except Exception as e:
|
||
logging.error(f"SQL Error in save_user:")
|
||
logging.error(f"User ID: {user.id}")
|
||
logging.error(f"Error: {e}")
|
||
raise
|
||
|
||
try:
|
||
return await self._execute_query(_save_user)
|
||
except Exception as e:
|
||
logging.error(f"Error saving user {user.id}: {e}")
|
||
return False
|
||
|
||
async def update_user_payment(self, user_id: int, payment_amount: float) -> bool:
|
||
"""
|
||
Обновляет данные о платежах пользователя
|
||
"""
|
||
def _update_payment():
|
||
try:
|
||
with self._get_connection() as conn:
|
||
with conn.cursor() as cur:
|
||
update_query = """
|
||
UPDATE bot_users SET
|
||
total_payments = total_payments + :amount,
|
||
successful_payments_count = successful_payments_count + 1,
|
||
last_interaction_date = SYSDATE
|
||
WHERE id = :user_id
|
||
"""
|
||
|
||
cur.execute(update_query, {
|
||
"user_id": user_id,
|
||
"amount": payment_amount
|
||
})
|
||
conn.commit()
|
||
return cur.rowcount > 0
|
||
except Exception as e:
|
||
logging.error(f"SQL Error in update_user_payment:")
|
||
logging.error(f"User ID: {user_id}, Amount: {payment_amount}")
|
||
logging.error(f"Error: {e}")
|
||
raise
|
||
|
||
try:
|
||
return await self._execute_query(_update_payment)
|
||
except Exception as e:
|
||
logging.error(f"Error updating payment for user {user_id}: {e}")
|
||
return False
|
||
|
||
async def get_user_stats(self, user_id: int) -> Optional[Dict]:
|
||
"""
|
||
Получает статистику пользователя из базы данных
|
||
"""
|
||
def _get_stats():
|
||
try:
|
||
with self._get_connection() as conn:
|
||
with conn.cursor() as cur:
|
||
query = """
|
||
SELECT
|
||
first_name, last_name, username, language_code,
|
||
is_premium, interaction_count, total_payments,
|
||
successful_payments_count, first_interaction_date,
|
||
last_interaction_date
|
||
FROM bot_users
|
||
WHERE id = :user_id
|
||
"""
|
||
|
||
cur.execute(query, {"user_id": user_id})
|
||
result = cur.fetchone()
|
||
|
||
if result:
|
||
return {
|
||
"first_name": result[0],
|
||
"last_name": result[1],
|
||
"username": result[2],
|
||
"language_code": result[3],
|
||
"is_premium": bool(result[4]),
|
||
"interaction_count": result[5],
|
||
"total_payments": float(result[6]) if result[6] else 0.0,
|
||
"successful_payments_count": result[7],
|
||
"first_interaction_date": result[8],
|
||
"last_interaction_date": result[9]
|
||
}
|
||
return None
|
||
except Exception as e:
|
||
logging.error(f"SQL Error in get_user_stats:")
|
||
logging.error(f"User ID: {user_id}")
|
||
logging.error(f"Error: {e}")
|
||
raise
|
||
|
||
try:
|
||
return await self._execute_query(_get_stats)
|
||
except Exception as e:
|
||
logging.error(f"Error getting stats for user {user_id}: {e}")
|
||
return None
|
||
|
||
async def get_users_summary(self) -> Dict:
|
||
"""
|
||
Получает общую статистику по пользователям
|
||
"""
|
||
def _get_summary():
|
||
try:
|
||
with self._get_connection() as conn:
|
||
with conn.cursor() as cur:
|
||
summary_query = """
|
||
SELECT
|
||
COUNT(*) as total_users,
|
||
COUNT(CASE WHEN is_premium = 1 THEN 1 END) as premium_users,
|
||
SUM(total_payments) as total_revenue,
|
||
SUM(successful_payments_count) as total_transactions,
|
||
COUNT(CASE WHEN last_interaction_date >= SYSDATE - 1 THEN 1 END) as active_last_24h,
|
||
COUNT(CASE WHEN last_interaction_date >= SYSDATE - 7 THEN 1 END) as active_last_week
|
||
FROM bot_users
|
||
WHERE is_active = 1
|
||
"""
|
||
|
||
cur.execute(summary_query)
|
||
result = cur.fetchone()
|
||
|
||
return {
|
||
"total_users": result[0] or 0,
|
||
"premium_users": result[1] or 0,
|
||
"total_revenue": float(result[2]) if result[2] else 0.0,
|
||
"total_transactions": result[3] or 0,
|
||
"active_last_24h": result[4] or 0,
|
||
"active_last_week": result[5] or 0
|
||
}
|
||
except Exception as e:
|
||
logging.error(f"SQL Error in get_users_summary:")
|
||
logging.error(f"Error: {e}")
|
||
raise
|
||
|
||
try:
|
||
return await self._execute_query(_get_summary)
|
||
except Exception as e:
|
||
logging.error(f"Error getting users summary: {e}")
|
||
return {} |