savagedb_bot/utils/photo_utils.py

199 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Утилиты для работы с фотографиями автомобилей
"""
import asyncio
import logging
import os
from typing import List
from aiogram.types import Message, InputMediaPhoto, FSInputFile
from config.settings import IMAGE_PATH
from utils.system_utils import is_windows
def convert_photo_path(db_path: str) -> str:
"""
Конвертирует путь к фотографии в зависимости от операционной системы
Args:
db_path: путь из базы данных в Linux формате (с /)
Returns:
str: полный путь к файлу с учетом OS и базового пути
"""
if not db_path:
return ""
# Убираем лишние пробелы из пути
db_path = db_path.strip()
# Базовый путь из константы
base_path = IMAGE_PATH
if is_windows():
# Конвертируем Linux пути в Windows формат
windows_path = db_path.replace('/', '\\')
full_path = f"{base_path}\\{windows_path}"
logging.info(f"Converted path for Windows: {db_path} -> {full_path}")
return full_path
else:
# Для Linux/macOS оставляем как есть
full_path = f"{base_path}/{db_path}"
logging.info(f"Path for Linux/macOS: {db_path} -> {full_path}")
return full_path
def prepare_photo_paths(db_paths: List[str]) -> List[str]:
"""
Подготавливает список полных путей к фотографиям
Args:
db_paths: список путей из базы данных
Returns:
list: список полных путей к файлам
"""
if not db_paths:
return []
full_paths = []
for db_path in db_paths:
full_path = convert_photo_path(db_path)
if full_path:
full_paths.append(full_path)
logging.info(f"Prepared {len(full_paths)} photo paths from {len(db_paths)} database paths")
return full_paths
async def send_vehicle_photos(message: Message, vin: str, photo_paths: List[str], make: str, model: str, year: str):
"""
Отправляет фотографии автомобиля пользователю
Args:
message: сообщение пользователя
vin: VIN автомобиля
photo_paths: список полных путей к фотографиям
make, model, year: информация об автомобиле
"""
if not photo_paths:
await message.answer("❌ No photos found to send.")
return
try:
# Дебаг информация о текущем пользователе (только для Unix систем)
if not is_windows():
try:
import pwd
import grp
current_user = pwd.getpwuid(os.getuid())
current_groups = [grp.getgrgid(gid).gr_name for gid in os.getgroups()]
logging.info(f"DEBUG: Running as user: {current_user.pw_name}({os.getuid()}), groups: {current_groups}")
except ImportError:
# pwd и grp модули недоступны на Windows
pass
# Telegram позволяет максимум 10 фотографий в media group
photos_per_group = 10
total_photos = len(photo_paths)
logging.info(f"Attempting to send {total_photos} photos for VIN: {vin}")
# Разбиваем фотографии на группы по 10
photo_groups = [photo_paths[i:i + photos_per_group] for i in range(0, len(photo_paths), photos_per_group)]
total_groups = len(photo_groups)
logging.info(f"Split {total_photos} photos into {total_groups} groups")
sent_count = 0
for group_num, photo_group in enumerate(photo_groups, 1):
try:
logging.info(f"Processing group {group_num}/{total_groups} with {len(photo_group)} photos")
# Создаем media group для текущей группы
media_group = []
for i, photo_path in enumerate(photo_group):
try:
# Дебаг информация о файле (только для Unix систем)
if not is_windows():
try:
import stat
import pwd
import grp
stat_info = os.stat(photo_path)
file_owner = pwd.getpwuid(stat_info.st_uid).pw_name
file_group = grp.getgrgid(stat_info.st_gid).gr_name
file_perms = oct(stat_info.st_mode)[-3:]
logging.info(f"DEBUG: File {photo_path} - owner: {file_owner}({stat_info.st_uid}), group: {file_group}({stat_info.st_gid}), perms: {file_perms}")
except Exception as debug_e:
logging.warning(f"DEBUG: Cannot get file info for {photo_path}: {debug_e}")
if os.path.exists(photo_path):
# Создаем InputMediaPhoto
if i == 0 and group_num == 1:
# Первая фотография первой группы с полным описанием
if make == "UNKNOWN" and model == "UNKNOWN" and year == "UNKNOWN":
caption = f"📸 Vehicle damage photos\n📋 VIN: {vin}\n📊 Total photos: {total_photos}\n🗂️ Group {group_num}/{total_groups}"
else:
caption = f"📸 {year} {make} {model}\n📋 VIN: {vin}\n📊 Total photos: {total_photos}\n🗂️ Group {group_num}/{total_groups}"
elif i == 0:
# Первая фотография других групп с номером группы
caption = f"🗂️ Group {group_num}/{total_groups}"
else:
# Остальные фотографии без описания
caption = None
if caption:
media_group.append(InputMediaPhoto(
media=FSInputFile(photo_path),
caption=caption
))
else:
media_group.append(InputMediaPhoto(
media=FSInputFile(photo_path)
))
sent_count += 1
logging.info(f"Added photo {sent_count}/{total_photos}: {photo_path}")
else:
logging.warning(f"Photo file not found: {photo_path}")
except Exception as photo_error:
logging.error(f"Error processing photo {sent_count + 1} ({photo_path}): {photo_error}")
continue
if media_group:
# Отправляем media group
await message.answer_media_group(media_group)
logging.info(f"Successfully sent group {group_num}/{total_groups} with {len(media_group)} photos")
# Небольшая пауза между группами для избежания rate limiting
if group_num < total_groups:
await asyncio.sleep(0.5)
else:
logging.warning(f"No valid photos in group {group_num}")
except Exception as group_error:
logging.error(f"Error sending photo group {group_num}: {group_error}")
await message.answer(f"❌ Error sending photo group {group_num}. Continuing with remaining photos...")
continue
if sent_count > 0:
# Отправляем итоговое сообщение
await message.answer(
f"✅ **Photos sent successfully!**\n"
f"📊 **{sent_count} of {total_photos} photos** delivered\n"
f"🗂️ **{total_groups} photo groups** sent"
)
logging.info(f"Successfully sent {sent_count}/{total_photos} photos for VIN: {vin}")
else:
# Если ни одна фотография не найдена
await message.answer(
"❌ **Error:** Photo files not found on server.\n"
"Please contact support with your transaction details."
)
logging.error(f"No valid photo files found for VIN: {vin}")
except Exception as e:
logging.error(f"Error sending photos for VIN {vin}: {e}")
await message.answer(
"❌ **Error sending photos.** Please contact support with your transaction details.\n"
f"Error details: {str(e)}"
)