from flask import Flask, render_template, send_from_directory, make_response, request, redirect, session, g, json, send_file, abort, jsonify import os import random import socket import oracledb import traceback import requests from logging.config import dictConfig import secrets from werkzeug.middleware.proxy_fix import ProxyFix import io import json from expiring_dict import ExpiringDict import uuid from flask_swagger_ui import get_swaggerui_blueprint import datetime from reportlab.pdfgen import canvas from reportlab.lib.pagesizes import A4 from reportlab.lib import colors from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer, Image from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.units import inch, mm from reportlab.pdfgen.canvas import Canvas from reportlab.lib.enums import TA_CENTER, TA_LEFT, TA_RIGHT from reportlab.platypus import Frame import sys import logging capcha_score: float = os.environ.get('CAPCHA_SCORE',0.5) capcha_site = '6LcJpHMgAAAAAMQLNY_g8J2Kv_qmCGureRN_lbGl' capcha_site_sec = '6LcJpHMgAAAAAIUf4Jg_7NvawQKZoLoVypDU6-d8' capcha_site_url='https://www.google.com/recaptcha/api/siteverify' site = 'salvagedb.com' app_path = os.path.dirname(os.path.realpath(__file__)) app = Flask(__name__) app_debug : bool = os.environ.get('APP_DEBUG',False) app.debug = os.environ.get('APP_DEBUG',False) app.secret_key = secrets.token_hex() app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1) os.environ['NLS_LANG'] = 'American_America.AL32UTF8' app.cache = ExpiringDict(60*60*24) # Swagger UI SWAGGER_URL = '/api/docs' API_URL = '/api/swagger.yaml' swaggerui_blueprint = get_swaggerui_blueprint( SWAGGER_URL, API_URL, config={ 'app_name': "SalvageDB API" } ) app.register_blueprint(swaggerui_blueprint, url_prefix=SWAGGER_URL) try: gips = open('gips.txt').read().split('\n') except: gips=[] if app_debug: print(gips) def save_request(request): req_data = {} req_data['endpoint'] = request.endpoint req_data['method'] = request.method req_data['cookies'] = request.cookies req_data['data'] = request.data req_data['headers'] = dict(request.headers) req_data['headers'].pop('Cookie', None) req_data['args'] = request.args req_data['form'] = request.form req_data['remote_addr'] = request.remote_addr return req_data # Создаем директорию для логов если её нет log_dir = os.path.join(app_path, 'logs') if not os.path.exists(log_dir): os.makedirs(log_dir) # Определяем уровень логирования в зависимости от окружения log_level = 'DEBUG' if app_debug else 'INFO' dictConfig({ "version": 1, "disable_existing_loggers": False, "formatters": { "default": { "format": "[%(asctime)s] [%(levelname)s] [%(name)s:%(lineno)d] %(message)s", "datefmt": "%Y-%m-%d %H:%M:%S" }, "error": { "format": "[%(asctime)s] [%(levelname)s] [%(name)s:%(lineno)d] [%(pathname)s] %(message)s", "datefmt": "%Y-%m-%d %H:%M:%S" } }, "handlers": { "console": { "class": "logging.StreamHandler", "formatter": "default", "stream": "ext://sys.stdout" }, "file": { "class": "logging.handlers.RotatingFileHandler", "formatter": "default", "filename": os.path.join(log_dir, "app.log"), "maxBytes": 10485760, # 10MB "backupCount": 10, "encoding": "utf8" }, "error_file": { "class": "logging.handlers.RotatingFileHandler", "formatter": "error", "filename": os.path.join(log_dir, "error.log"), "maxBytes": 10485760, # 10MB "backupCount": 10, "encoding": "utf8", "level": "ERROR" } }, "loggers": { "werkzeug": { "handlers": ["console", "file"], "level": log_level, "propagate": False }, "flask": { "handlers": ["console", "file"], "level": log_level, "propagate": False }, "app": { "handlers": ["console", "file", "error_file"], "level": log_level, "propagate": False } }, "root": { "level": log_level, "handlers": ["console", "file", "error_file"] } }) # Создаем логгер для приложения logger = logging.getLogger("app") class OverlayCanvas(Canvas): def __init__(self, *args, **kwargs): Canvas.__init__(self, *args, **kwargs) self._saved_page_states = [] def showPage(self): self._saved_page_states.append(dict(self.__dict__)) self._startPage() def save(self): for state in self._saved_page_states: self.__dict__.update(state) self.draw_overlay() # теперь рисуем поверх Canvas.showPage(self) Canvas.save(self) def draw_overlay(self): stamp_path = os.path.join(app_path, 'static', 'stamp256.png') if os.path.exists(stamp_path): self.drawImage( stamp_path, x=400, # настроить по ширине y=400, # настроить по высоте width=100, height=100, mask='auto' ) @app.after_request def after_request(response): if request.cookies.get('user_id', None) == None: response.set_cookie('user_id', str(random.randint(0, 10000000000)), max_age=60 * 60 * 24 * 365 * 10) return response def init_session(connection, requestedTag_ignored): pass # cursor = connection.cursor() # cursor.execute(""" # ALTER SESSION SET # TIME_ZONE = 'UTC' # NLS_DATE_FORMAT = 'YYYY-MM-DD HH24:MI'""") def start_pool(): pool_min = 4 pool_max = 4 pool_inc = 0 if app_debug: print('Connecting to {}:{}/{}'.format(os.environ.get("DB_HOST"), os.environ.get("DB_PORT"), os.environ.get("DB_DBNAME"))) pool = oracledb.create_pool( user=os.environ.get("DB_USERNAME"), password=os.environ.get("DB_PASSWORD"), dsn='{}:{}/{}'.format(os.environ.get("DB_HOST"), os.environ.get("DB_PORT"), os.environ.get("DB_DBNAME")), #params=sample_env.get_pool_params(), min=pool_min, max=pool_max, increment=pool_inc, session_callback=init_session, ) return pool @app.route("/sitemap.xml") def sitemap(): conn = pool.acquire() cur = conn.cursor() cur.execute('select distinct trunc(pg/40000)+1 nm from mv_pages'); nm = cur.fetchall() return render_template('sitemap.xml', site=site, siten=nm) @app.route("/sitemaps/.xml") def sitemaps_xml(sitemap_id): try: id: int = int(sitemap_id) conn = pool.acquire() cur = conn.cursor() cur.execute('select pg from mv_pages where sitemap_number = :p1', {'p1': id}) sitemap_pages = cur.fetchall() return render_template('sitemaps.xml', site=site, sitemap_pages=sitemap_pages) except: logger.error(traceback.format_exc()) return 'bad request!', 500 @app.route("/") def index_html(): try: if 'maxnum' in app.cache: if app_debug: print('Find in cache max_num={}'.format(app.cache['maxnum'])) cnt = app.cache['maxnum'] else: conn = pool.acquire() cur = conn.cursor() cur.execute('select max(num) from salvagedb') cnt = "{:,}".format(cur.fetchone()[0]) app.cache['maxnum'] = cnt return render_template('index.html', site=site, cnt=cnt ,capcha_site=capcha_site) except: logger.error(traceback.format_exc()) return 'bad request!', 500 @app.route("/privacy.html") def privacy_html(): try: return render_template('privacy.html', site=site) except: logger.error(traceback.format_exc()) return 'bad request!', 500 @app.route("/robots.txt") def robot_txt(): try: return render_template('robots.txt', site=site) except: logger.error(traceback.format_exc()) return 'bad request!', 500 @app.route("/decode", methods = ['POST']) def decode(): try: vin = request.form.get('q').strip() g_respone = request.form['g-recaptcha-response'] capcha_check = requests.post(url=f'{capcha_site_url}?secret={capcha_site_sec}&response={g_respone}').json() if capcha_check['success'] == False or capcha_check['score'] .html") def database_page_prc(page_num): try: pg = int(page_num) conn = pool.acquire() cur = conn.cursor() user_ip = get_ip(request) ## определение ip клиента try: returnVal = cur.callfunc("checkip", int, [user_ip, request.headers.get("CF-IPCountry", 'None'), get_addr(user_ip), 1, 0, 0]) except: print(request) logger.error(traceback.format_exc()) cur.execute('select max(pg) from mv_pages') max_page = int(cur.fetchall()[0][0]) cur.execute("""select rownum,s.vin, COALESCE ((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='26'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Make'),'UNKNOWN') make, COALESCE ((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='28'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Model'),'UNKNOWN') model, COALESCE ((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='29'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Model Year'),'UNKNOWN') year from salvagedb s where num between (select mi from mv_pages where pg = :p1) and (select ma from mv_pages where pg = :p1)""", {'p1': pg}) pgf = cur.fetchall() return render_template('database.html', site=site, cur_page=pg, max_page=max_page, pg=pgf) except: logger.error(traceback.format_exc()) return 'bad request!', 500 @app.route("/detail/.html") def detail_vin(vin): try: conn = pool.acquire() cur = conn.cursor() user_ip = get_ip(request) ## определение ip клиента try: returnVal = cur.callfunc("checkip", int, [user_ip, request.headers.get("CF-IPCountry", 'None'), get_addr(user_ip), 0, 0, 1]) except: print(request) logger.error(traceback.format_exc()) cur.execute("""select 'None', COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='26'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Make'),'UNKNOWN') make, COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='28'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Model'),'UNKNOWN') model, COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='29'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Model Year'),'UNKNOWN') year, COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='5'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Body Class'),'UNKNOWN') body, COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='13'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Engine Model'),'UNKNOWN') engine, COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='9'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Engine Number of Cylinders'),'UNKNOWN') celindr, COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='15'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Drive Type'),'-') drive from (select substr(:p1,1,10) svin, :p1 vin from dual) s """, {'p1': vin}) res = cur.fetchall() return render_template('details.html', site=site, vin=vin, det=res, capcha_site=capcha_site) except: logger.error(traceback.format_exc()) return 'bad request!', 500 @app.route('/search', methods = ['POST']) def search(): try: user_ip = get_ip(request) ## определение ip клиента vin = request.form.get('q') ua = request.headers.get('User-Agent') logger.info(f'AgeNt: {ua}') g_respone = request.form.get('g-recaptcha-response') capcha_check = requests.post(url=f'{capcha_site_url}?secret={capcha_site_sec}&response={g_respone}').json() if capcha_check['success'] == False or capcha_check['score'] 16 or access_code is None: ret = {'status': 'incorrect access_code'} response = app.response_class( response=json.dumps(ret), status=200, mimetype='application/json' ) if app.debug: logger.debug(json.dumps(ret)) return response conn = pool.acquire() cur = conn.cursor() ## проверяем access_code cur.execute( 'select t.summ, t.found_price, t.notfound_price, t.decode_price from restfact t where access_code = :p1', {'p1': str(access_code)}) res = cur.fetchone() summ = res[0] found_price = res[1] notfound_price = res[2] decode_price = res[3] ### не достаточно средств if summ <= 0: ret = {'status': 'You have insufficient balance.'} response = app.response_class( response=json.dumps(ret), status=200, mimetype='application/json' ) if app.debug: logger.debug(json.dumps(ret)) return response ## ищем в истории cur.execute('select t.odo,t.odos,t.title,t.dem1,t.dem2,t.year,t.month from salvagedb t where vin =:p1 and svin = :p2', {'p1': vin.upper(), 'p2': vin.upper()[:10]}) res = cur.fetchall() if len(res) > 0: # found dat = [] for it in res: dat.append({ 'odometer': it[0], 'odometer_status': it[1], 'title': it[2], 'damage1': it[3], 'damage2': it[4], 'add_to_db': '{}-{}'.format(it[5], it[6]) }) ret = { 'status': 'found', 'vin': vin.upper(), 'cost': found_price, 'records': dat } response = app.response_class( response=json.dumps(ret), status=200, mimetype='application/json' ) if app.debug: logger.debug(json.dumps(ret)) cur.execute("insert into billing(access_code, vin, ip, status, dt, cost) values (:p1, :p2, :p3, :p4, CAST(SYSTIMESTAMP AT TIME ZONE 'UTC' AS DATE), :p5)", {'p1': str(access_code), 'p2': str(vin), 'p3': str(user_ip), 'p4': 'FOUND', 'p5': found_price}) conn.commit() return response else: # nor found ret = {'status': 'NOT FOUND', 'cost': notfound_price} if app.debug: logger.debug(json.dumps(ret)) response = app.response_class( response=json.dumps(ret), status=200, mimetype='application/json' ) cur.execute("insert into billing(access_code, vin, ip, status, dt, cost) values (:p1, :p2, :p3, :p4, CAST(SYSTIMESTAMP AT TIME ZONE 'UTC' AS DATE), :p5)", {'p1': str(access_code), 'p2': str(vin), 'p3': str(user_ip), 'p4': 'NOT FOUND', 'p5': notfound_price}) conn.commit() return response except: logger.error(traceback.format_exc()) return 'bad request!', 500 @app.route("/api/restfact") def api_restfact(): access_code = request.args.get('access_code', None) if len(access_code) > 16 or access_code is None: ret = {'status': 'incorrect access_code'} response = app.response_class( response=json.dumps(ret), status=200, mimetype='application/json' ) if app.debug: logger.debug(json.dumps(ret)) return response conn = pool.acquire() cur = conn.cursor() cur.callproc('make_billing') cur.execute( 'select t.summ, t.found_price, t.notfound_price, t.decode_price from restfact t where access_code = :p1', {'p1': str(access_code)}) res = cur.fetchone() summ = res[0] if len(res) <= 0: ret = {'status': 'incorrect access_code'} response = app.response_class( response=json.dumps(ret), status=200, mimetype='application/json' ) if app.debug: logger.debug(json.dumps(ret)) return response else: ret = { 'access_code': access_code, 'restfact': summ } response = app.response_class( response=json.dumps(ret), status=200, mimetype='application/json' ) if app.debug: logger.debug(json.dumps(ret)) return response @app.route("/api/restfact_detail") def restfact_detail(): access_code = request.args.get('access_code', None) if len(access_code) > 16 or access_code is None: ret = {'status': 'incorrect access_code'} response = app.response_class( response=json.dumps(ret), status=200, mimetype='application/json' ) if app.debug: logger.debug(json.dumps(ret)) return response conn = pool.acquire() cur = conn.cursor() cur.callproc('make_billing') cur.execute('select rownum from restfact t where access_code = :p1', {'p1': str(access_code)}) res = cur.fetchall() if len(res) <= 0: ret = {'status': 'incorrect access_code'} response = app.response_class( response=json.dumps(ret), status=200, mimetype='application/json' ) if app.debug: logger.debug(json.dumps(ret)) return response cur.execute("select vin, ip, status, dt, cost from billing where access_code = :p1 and dt > CAST(SYSTIMESTAMP AT TIME ZONE 'UTC' AS DATE) - 45 and made = 1 order by id desc", {'p1': str(access_code)}) res = cur.fetchall() if len(res) <= 0: ret = {'status': 'billing not found'} response = app.response_class( response=json.dumps(ret), status=200, mimetype='application/json' ) if app.debug: logger.debug(json.dumps(ret)) return response else: dat = [] for it in res: dat.append( { 'VIN': it[0], 'IP': it[1], 'STATUS': it[2], 'DATETIME': it[3].isoformat(), 'COST': it[4] } ) ret = { 'access_code': access_code, 'report': dat } response = app.response_class( response=json.dumps(ret), status=200, mimetype='application/json' ) if app.debug: logger.debug(json.dumps(ret)) return response ## API V2 @app.route("/api/v2/search") def api_search_v2(): try: access_code = request.args.get('access_code', None) vin = request.args.get('vin', None) user_ip = get_ip(request) ## определение ip клиента # базовые проверки входящих аргументов if len(vin) != 17 or vin is None: ret = {'status': 'incorrect vin'} response = app.response_class( response=json.dumps(ret), status=200, mimetype='application/json' ) if app.debug: logger.debug(json.dumps(ret)) return response if len(access_code) > 16 or access_code is None: ret = {'status': 'incorrect access_code'} response = app.response_class( response=json.dumps(ret), status=200, mimetype='application/json' ) if app.debug: logger.debug(json.dumps(ret)) return response conn = pool.acquire() cur = conn.cursor() ## проверяем access_code cur.execute( 'select t.summ, t.found_price, t.notfound_price, t.decode_price from restfact t where access_code = :p1', {'p1': str(access_code)}) res = cur.fetchone() summ = res[0] found_price = res[1] notfound_price = res[2] decode_price = res[3] ### не достаточно средств if summ <= 0: ret = {'status': 'You have insufficient balance.'} response = app.response_class( response=json.dumps(ret), status=200, mimetype='application/json' ) if app.debug: logger.debug(json.dumps(ret)) return response ## ищем в истории cur.execute('''select s.odo,s.odos,s.title,s.dem1,s.dem2,s.year,s.month, json_value(i.jdata, '$.Runs_Drive') RD_Status, json_value(i.jdata, '$.Locate') Sale_Location, json_value(i.jdata, '$.RepCost') as Repear_cost, (select count(1) from salvage_images si where si.vin = s.vin and fn = 1) image_count from salvagedb.salvagedb s left join addinfo i on s.num = i.numid where vin =:p1 and svin = :p2''', {'p1': vin.upper(), 'p2': vin.upper()[:10]}) res = cur.fetchall() if len(res) > 0: # found dat = [] for it in res: dat.append({ 'odometer': it[0], 'odometer_status': it[1], 'title': it[2], 'damage1': it[3], 'damage2': it[4], 'add_to_db': '{}-{}'.format(it[5], it[6]), 'RD_Status':it[7], 'Sale_Location':it[8], 'Repear_Cost':it[9], 'Photo_Count':it[10] }) ret = { 'status': 'found', 'vin': vin.upper(), 'cost': found_price, 'records': dat } response = app.response_class( response=json.dumps(ret), status=200, mimetype='application/json' ) if app.debug: logger.debug(json.dumps(ret)) cur.execute("insert into billing(access_code, vin, ip, status, dt, cost) values (:p1, :p2, :p3, :p4, CAST(SYSTIMESTAMP AT TIME ZONE 'UTC' AS DATE), :p5)", {'p1': str(access_code), 'p2': str(vin), 'p3': str(user_ip), 'p4': 'FOUND', 'p5': found_price}) conn.commit() return response else: # nor found ret = {'status': 'NOT FOUND', 'cost': notfound_price} if app.debug: logger.debug(json.dumps(ret)) response = app.response_class( response=json.dumps(ret), status=200, mimetype='application/json' ) cur.execute("insert into billing(access_code, vin, ip, status, dt, cost) values (:p1, :p2, :p3, :p4, CAST(SYSTIMESTAMP AT TIME ZONE 'UTC' AS DATE), :p5)", {'p1': str(access_code), 'p2': str(vin), 'p3': str(user_ip), 'p4': 'NOT FOUND', 'p5': notfound_price}) conn.commit() return response except: logger.error(traceback.format_exc()) return 'bad request!', 500 @app.route("/api/v2/reqphoto") def api_reqimage(): try: access_code = request.args.get('access_code', None) vin = request.args.get('vin', None) # user_ip = get_ip(request) ## определение ip клиента # базовые проверки входящих аргументов if len(vin) != 17 or vin is None: ret = {'status': 'incorrect vin'} response = app.response_class( response=json.dumps(ret), status=200, mimetype='application/json' ) if app.debug: logger.debug(json.dumps(ret)) return response if len(access_code) > 16 or access_code is None: ret = {'status': 'incorrect access_code'} response = app.response_class( response=json.dumps(ret), status=200, mimetype='application/json' ) if app.debug: logger.debug(json.dumps(ret)) return response conn = pool.acquire() cur = conn.cursor() ## проверяем access_code cur.execute( 'select t.summ, t.found_price, t.notfound_price, t.decode_price from restfact t where access_code = :p1', {'p1': str(access_code)}) res = cur.fetchone() summ = res[0] found_price = res[1] notfound_price = res[2] decode_price = res[3] if summ <= 0: ret = {'status': 'You have insufficient balance.'} response = app.response_class( response=json.dumps(ret), status=200, mimetype='application/json' ) if app.debug: logger.debug(json.dumps(ret)) return response cur.execute('select count(*) from salvagedb.salvage_images where vin = :p1 and fn = 1' , {'p1':vin.upper()}) res = cur.fetchone() img_count = int(res[0]) if img_count<1: ret = {'status': 'Photos not found for this vin.'} response = app.response_class( response=json.dumps(ret), status=200, mimetype='application/json' ) if app.debug: logger.debug(json.dumps(ret)) return response else: cur.execute("""select GenImages('{}','{}') as ui from dual""".format(access_code, vin.upper())) res = cur.fetchall() req_id = res[0][0] cur.execute('select fake_name from salvage_images_req where req_id = :p1', {'p1':req_id}) res = cur.fetchall() images = [] nm = 0 for it in res: images.append({ 'num':nm, 'url':'https://salvagedb.com/simages/{}/{}'.format(req_id, it[0]) }) nm = nm + 1 ret = {'status': 'Image_found', 'Images':images} response = app.response_class( response=json.dumps(ret), status=200, mimetype='application/json' ) return response except: logger.error(traceback.format_exc()) return 'bad request!', 500 @app.route("/ads.txt") def ads_txt(): try: return render_template('ads.txt') except: logger.error(traceback.format_exc()) return 'bad request!', 500 @app.route('/favicon.ico') def logo(): return send_file(app_path+"/static/favicon.ico") @app.route('/limit') def rate_limit(): try: return render_template('rate_limit.html', site=site) except: logger.error(traceback.format_exc()) return 'bad request!', 500 @app.route('/donate') def donate(): return render_template('donate.html') def get_ip(req) -> str: if 'X-Forwarded-For' in req.headers: proxy_data = req.headers['X-Forwarded-For'] ip_list = proxy_data.split(',') user_ip = req.headers.get("Cf-Connecting-Ip") or ip_list[0] # first address in list is User IP else: user_ip = req.headers.get("Cf-Connecting-Ip") or req.remote_addr return user_ip def get_addr(req) -> str: try: return socket.gethostbyaddr(req)[0] except: return req @app.route('/api/swagger.yaml') def swagger_yaml(): return send_from_directory('api', 'swagger.yaml') @app.route('/static/') def serve_static(filename): try: # Проверка расширения файла allowed_extensions = {'.css', '.js', '.png', '.jpg', '.jpeg', '.gif', '.ico', '.svg', '.json'} file_ext = os.path.splitext(filename)[1].lower() if file_ext not in allowed_extensions: logger.warning(f'Attempt to access forbidden file type: {filename}') return 'Access denied', 403 # Проверка пути на directory traversal safe_path = os.path.normpath(os.path.join('static', filename)) if not safe_path.startswith('static'): logger.warning(f'Attempt to access file outside static directory: {filename}') return 'Access denied', 403 # Определение MIME-типа mime_types = { '.css': 'text/css', '.js': 'application/javascript', '.json': 'application/json', '.png': 'image/png', '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.gif': 'image/gif', '.ico': 'image/x-icon', '.svg': 'image/svg+xml' } mime_type = mime_types.get(file_ext, 'application/octet-stream') # Логирование доступа logger.info(f'Access to static file: {filename}') response = make_response(send_from_directory('static', filename)) response.headers['Content-Type'] = mime_type # Специальные заголовки для PWA файлов if filename == 'manifest.json': response.headers['Content-Type'] = 'application/manifest+json' elif filename == 'sw.js': response.headers['Service-Worker-Allowed'] = '/' return response except Exception as e: logger.error(f'Error accessing file {filename}: {str(e)}') return 'File not found', 404 @app.route("/salvagereport/") def generate_pdf_report(vin): try: # Проверяем наличие VIN в сессии и время последнего поиска if 'last_searched_vin' not in session or session['last_searched_vin'] != vin: logger.warning(f'Direct access attempt to report generation for VIN: {vin}') return 'Access denied', 403 # Проверяем время последнего поиска (не более 5 минут) if datetime.datetime.now().timestamp() - session['last_search_time'] > 300: logger.warning(f'Report generation attempt expired for VIN: {vin}') return 'Access denied', 403 conn = pool.acquire() cur = conn.cursor() user_ip = get_ip(request) try: returnVal = cur.callfunc("checkip", int, [user_ip, request.headers.get("CF-IPCountry", 'None'), get_addr(user_ip), 1, 0, 0]) except: logger.error(traceback.format_exc()) # Get vehicle details cur.execute("""select 'None', COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='26'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Make'),'UNKNOWN') make, COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='28'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Model'),'UNKNOWN') model, COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='29'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Model Year'),'UNKNOWN') year, COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='5'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Body Class'),'UNKNOWN') body, COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='13'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Engine Model'),'UNKNOWN') engine, COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='9'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Engine Number of Cylinders'),'UNKNOWN') celindr, COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='15'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Drive Type'),'-') drive from (select substr(:p1,1,10) svin, :p1 vin from dual) s """, {'p1': vin}) det = cur.fetchall() # Get salvage history cur.execute("""select rownum, t.vin, t.title, t.odo, t.odos, t.dem1, t.dem2, t.year||'/'||t.month from salvagedb t where vin = :p1 and svin = substr(:p1,1,10) """, {'p1': vin}) his = cur.fetchall() # Generate current date and report ID now = datetime.datetime.now() report_date = now.strftime("%Y-%m-%d") # Только дата без времени report_id = uuid.uuid4().hex[:8].upper() current_year = now.year try: # Создаем буфер для PDF buffer = io.BytesIO() # Создаем PDF документ doc = SimpleDocTemplate( buffer, pagesize=A4, rightMargin=20*mm, leftMargin=20*mm, topMargin=15*mm, bottomMargin=20*mm ) # Создаем стили для текста styles = getSampleStyleSheet() styles.add(ParagraphStyle( name='ReportTitle', parent=styles['Heading1'], fontSize=16, alignment=TA_CENTER )) styles.add(ParagraphStyle( name='ReportSubtitle', parent=styles['Normal'], fontSize=12, alignment=TA_CENTER, textColor=colors.HexColor('#0066CC') # Синий цвет для акцента )) styles.add(ParagraphStyle( name='Right', parent=styles['Normal'], fontSize=10, alignment=TA_RIGHT )) styles.add(ParagraphStyle( name='Disclaimer', parent=styles['Normal'], fontSize=8, textColor=colors.gray )) styles.add(ParagraphStyle( name='SalvageDBFooter', parent=styles['Normal'], fontSize=9, alignment=TA_CENTER, textColor=colors.HexColor('#0066CC'), # Синий цвет для акцента fontName='Helvetica-Bold' )) # Список элементов для PDF elements = [] # Путь к логотипу # logo_path = os.path.join(app_path, 'static', 'icons', 'icon-144x144.png') # Добавляем логотип, если файл существует # if os.path.exists(logo_path): # logo_img = Image(logo_path, width=30*mm, height=30*mm) # logo_img.hAlign = 'CENTER' # elements.append(logo_img) # elements.append(Spacer(1, 5*mm)) # Заголовок отчета с акцентом на SALVAGEDB.COM elements.append(Paragraph("Vehicle History Report", styles['ReportTitle'])) elements.append(Paragraph("SALVAGEDB.COM - Comprehensive Vehicle History Check", styles['ReportSubtitle'])) elements.append(Spacer(1, 10*mm)) # Информация об отчете elements.append(Paragraph(f"Report Date: {report_date}", styles['Right'])) elements.append(Paragraph(f"Report ID: {report_id}", styles['Right'])) elements.append(Spacer(1, 15*mm)) # Информация о транспортном средстве elements.append(Paragraph("Vehicle Information", styles['Heading2'])) # Создаем таблицу с информацией о транспортном средстве vehicle_data = [ ["VIN", vin], ["Make", det[0][1]], ["Model", det[0][2]], ["Year", det[0][3]], ["Body Style", det[0][4]], ["Engine", det[0][5]], ["Cylinders", det[0][6]], ["Drive", det[0][7]] ] vehicle_table = Table(vehicle_data, colWidths=[100, 350]) vehicle_table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (0, -1), colors.lightgrey), ('TEXTCOLOR', (0, 0), (0, -1), colors.black), ('ALIGN', (0, 0), (0, -1), 'LEFT'), ('FONTNAME', (0, 0), (0, -1), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (0, -1), 10), ('BOTTOMPADDING', (0, 0), (-1, -1), 6), ('BACKGROUND', (1, 0), (-1, -1), colors.white), ('GRID', (0, 0), (-1, -1), 0.5, colors.black), ])) elements.append(vehicle_table) elements.append(Spacer(1, 10*mm)) # Информация об истории if his: elements.append(Paragraph("Salvage History", styles['Heading2'])) # Создаем стиль для ячеек с переносом текста table_style = TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.black), ('ALIGN', (0, 0), (-1, 0), 'CENTER'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, 0), 9), ('BOTTOMPADDING', (0, 0), (-1, 0), 6), ('BACKGROUND', (0, 1), (-1, -1), colors.white), ('GRID', (0, 0), (-1, -1), 0.5, colors.black), ('VALIGN', (0, 0), (-1, -1), 'TOP'), ('LEFTPADDING', (0, 0), (-1, -1), 3), ('RIGHTPADDING', (0, 0), (-1, -1), 3), ('FONTSIZE', (0, 1), (-1, -1), 8), ('WORDWRAP', (0, 0), (-1, -1), True), ]) # Оптимизированные заголовки без VIN history_headers = ["Title", "Mileage", "Odometer Status", "Primary\nDamage", "Secondary\nDamage"] # Формируем данные для таблицы истории history_data = [history_headers] for item in his: title_value = item[2] primary_damage = item[5] if item[5] else "N/A" secondary_damage = item[6] if item[6] else "N/A" # Добавляем строку с данными, убрав VIN history_data.append([ title_value, str(item[3]), item[4], primary_damage, secondary_damage ]) # Устанавливаем ширину колонок оптимально для A4 available_width = doc.width # Доступная ширина страницы col_widths = [ available_width * 0.20, # 20% для Title available_width * 0.12, # 12% для Mileage available_width * 0.18, # 18% для Odometer Status available_width * 0.25, # 25% для Primary Damage available_width * 0.25 # 25% для Secondary Damage ] # Создаем таблицу с правильными размерами history_table = Table(history_data, colWidths=col_widths, repeatRows=1) history_table.setStyle(table_style) elements.append(history_table) else: elements.append(Paragraph("Salvage History", styles['Heading2'])) elements.append(Paragraph("Salvage history not found.", styles['Normal'])) elements.append(Spacer(1, 15*mm)) # Добавляем штамп поверх таблицы # stamp_path = os.path.join(app_path, 'static', 'stamp256.png') # if os.path.exists(stamp_path): # stamp_img = Image(stamp_path, width=100, height=100) # stamp_img.hAlign = 'RIGHT' # stamp_img.vAlign = 'TOP' # # Размещаем штамп поверх таблицы # elements.append(stamp_img) # elements.append(Spacer(1, 15*mm)) # Дисклеймер disclaimer_text = """ Disclaimer: This report provides information about salvage or junk vehicles; damage from hail, flood or fire; mileage discrepancies or odometer rollback; and gray market vehicles. We do not claim that the car got in our databank has salvage title, but the fact that it has been damaged for sure. Our site helps people avoid buying a damaged vehicle in the past. """ elements.append(Paragraph(disclaimer_text, styles['Disclaimer'])) # Футер с акцентом на SALVAGEDB.COM elements.append(Spacer(1, 15*mm)) elements.append(Paragraph(f"© {current_year} SALVAGEDB.COM - All Rights Reserved", styles['SalvageDBFooter'])) elements.append(Paragraph("Visit SALVAGEDB.COM for more information about vehicle history", styles['SalvageDBFooter'])) elements.append(Paragraph('"This report is provided as is without any guarantees or warranty."', styles['Disclaimer'])) # Строим PDF документ # doc.build(elements, onFirstPage=add_stamp_overlay, onLaterPages=add_stamp_overlay) doc.build(elements, canvasmaker=OverlayCanvas) # Получаем содержимое буфера pdf_data = buffer.getvalue() buffer.close() # Создаем HTTP-ответ с PDF response = make_response(pdf_data) response.headers['Content-Type'] = 'application/pdf' response.headers['Content-Disposition'] = f'attachment; filename=vehicle_report_{vin}.pdf' return response except Exception as pdf_error: # Если возникла ошибка при генерации PDF, логируем ее app.logger.error(f"PDF generation error: {str(pdf_error)}") app.logger.error(traceback.format_exc()) # Показываем печатную версию в браузере return render_template( 'report_printable.html', vin=vin, det=det, his=his, report_date=report_date, report_id=report_id, current_year=current_year, error_message=str(pdf_error) ) except Exception as e: app.logger.error(traceback.format_exc()) return 'Report generation failed: ' + str(e), 500 if __name__ == '__main__': # Start a pool of connections pool = start_pool() app.run(port=int(os.environ.get('PORT', '8080')))