salvagedb_web/app.py

1216 lines
50 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, render_template, send_from_directory, make_response, request, redirect, session, g, json, send_file, abort, jsonify
import os
import random
import socket
import oracledb
import traceback
import requests
from logging.config import dictConfig
import secrets
from werkzeug.middleware.proxy_fix import ProxyFix
import io
import json
from expiring_dict import ExpiringDict
import uuid
from flask_swagger_ui import get_swaggerui_blueprint
import datetime
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import A4
from reportlab.lib import colors
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer, Image
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch, mm
from reportlab.pdfgen.canvas import Canvas
from reportlab.lib.enums import TA_CENTER, TA_LEFT, TA_RIGHT
from reportlab.platypus import Frame
import sys
import logging
capcha_score: float = os.environ.get('CAPCHA_SCORE',0.5)
capcha_site = '6LcJpHMgAAAAAMQLNY_g8J2Kv_qmCGureRN_lbGl'
capcha_site_sec = '6LcJpHMgAAAAAIUf4Jg_7NvawQKZoLoVypDU6-d8'
capcha_site_url='https://www.google.com/recaptcha/api/siteverify'
site = 'salvagedb.com'
app_path = os.path.dirname(os.path.realpath(__file__))
app = Flask(__name__)
app_debug : bool = os.environ.get('APP_DEBUG',False)
app.debug = os.environ.get('APP_DEBUG',False)
app.secret_key = secrets.token_hex()
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1)
os.environ['NLS_LANG'] = 'American_America.AL32UTF8'
app.cache = ExpiringDict(60*60*24)
# Swagger UI
SWAGGER_URL = '/api/docs'
API_URL = '/api/swagger.yaml'
swaggerui_blueprint = get_swaggerui_blueprint(
SWAGGER_URL,
API_URL,
config={
'app_name': "SalvageDB API"
}
)
app.register_blueprint(swaggerui_blueprint, url_prefix=SWAGGER_URL)
try:
gips = open('gips.txt').read().split('\n')
except:
gips=[]
if app_debug:
print(gips)
def save_request(request):
req_data = {}
req_data['endpoint'] = request.endpoint
req_data['method'] = request.method
req_data['cookies'] = request.cookies
req_data['data'] = request.data
req_data['headers'] = dict(request.headers)
req_data['headers'].pop('Cookie', None)
req_data['args'] = request.args
req_data['form'] = request.form
req_data['remote_addr'] = request.remote_addr
return req_data
# Создаем директорию для логов если её нет
log_dir = os.path.join(app_path, 'logs')
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# Определяем уровень логирования в зависимости от окружения
log_level = 'DEBUG' if app_debug else 'INFO'
dictConfig({
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"format": "[%(asctime)s] [%(levelname)s] [%(name)s:%(lineno)d] %(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S"
},
"error": {
"format": "[%(asctime)s] [%(levelname)s] [%(name)s:%(lineno)d] [%(pathname)s] %(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S"
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "default",
"stream": "ext://sys.stdout"
},
"file": {
"class": "logging.handlers.RotatingFileHandler",
"formatter": "default",
"filename": os.path.join(log_dir, "app.log"),
"maxBytes": 10485760, # 10MB
"backupCount": 10,
"encoding": "utf8"
},
"error_file": {
"class": "logging.handlers.RotatingFileHandler",
"formatter": "error",
"filename": os.path.join(log_dir, "error.log"),
"maxBytes": 10485760, # 10MB
"backupCount": 10,
"encoding": "utf8",
"level": "ERROR"
}
},
"loggers": {
"werkzeug": {
"handlers": ["console", "file"],
"level": log_level,
"propagate": False
},
"flask": {
"handlers": ["console", "file"],
"level": log_level,
"propagate": False
},
"app": {
"handlers": ["console", "file", "error_file"],
"level": log_level,
"propagate": False
}
},
"root": {
"level": log_level,
"handlers": ["console", "file", "error_file"]
}
})
# Создаем логгер для приложения
logger = logging.getLogger("app")
class OverlayCanvas(Canvas):
def __init__(self, *args, **kwargs):
Canvas.__init__(self, *args, **kwargs)
self._saved_page_states = []
def showPage(self):
self._saved_page_states.append(dict(self.__dict__))
self._startPage()
def save(self):
for state in self._saved_page_states:
self.__dict__.update(state)
self.draw_overlay() # теперь рисуем поверх
Canvas.showPage(self)
Canvas.save(self)
def draw_overlay(self):
stamp_path = os.path.join(app_path, 'static', 'stamp256.png')
if os.path.exists(stamp_path):
self.drawImage(
stamp_path,
x=400, # настроить по ширине
y=400, # настроить по высоте
width=100,
height=100,
mask='auto'
)
@app.after_request
def after_request(response):
if request.cookies.get('user_id', None) == None:
response.set_cookie('user_id', str(random.randint(0, 10000000000)), max_age=60 * 60 * 24 * 365 * 10)
return response
def init_session(connection, requestedTag_ignored):
pass
# cursor = connection.cursor()
# cursor.execute("""
# ALTER SESSION SET
# TIME_ZONE = 'UTC'
# NLS_DATE_FORMAT = 'YYYY-MM-DD HH24:MI'""")
def start_pool():
pool_min = 4
pool_max = 4
pool_inc = 0
if app_debug:
print('Connecting to {}:{}/{}'.format(os.environ.get("DB_HOST"), os.environ.get("DB_PORT"), os.environ.get("DB_DBNAME")))
pool = oracledb.create_pool(
user=os.environ.get("DB_USERNAME"),
password=os.environ.get("DB_PASSWORD"),
dsn='{}:{}/{}'.format(os.environ.get("DB_HOST"), os.environ.get("DB_PORT"), os.environ.get("DB_DBNAME")),
#params=sample_env.get_pool_params(),
min=pool_min,
max=pool_max,
increment=pool_inc,
session_callback=init_session,
)
return pool
@app.route("/sitemap.xml")
def sitemap():
conn = pool.acquire()
cur = conn.cursor()
cur.execute('select distinct trunc(pg/40000)+1 nm from mv_pages')
nm = cur.fetchall()
return render_template('sitemap.xml', site=site, siten=nm)
@app.route("/sitemaps/<int:sitemap_id>.xml")
def sitemaps_xml(sitemap_id):
try:
id: int = int(sitemap_id)
conn = pool.acquire()
cur = conn.cursor()
cur.execute('select pg from mv_pages where sitemap_number = :p1', {'p1': id})
sitemap_pages = cur.fetchall()
return render_template('sitemaps.xml', site=site, sitemap_pages=sitemap_pages)
except:
logger.error(traceback.format_exc())
return 'bad request!', 500
@app.route("/")
def index_html():
try:
if 'maxnum' in app.cache:
if app_debug:
print('Find in cache max_num={}'.format(app.cache['maxnum']))
cnt = app.cache['maxnum']
else:
conn = pool.acquire()
cur = conn.cursor()
cur.execute('select max(num) from salvagedb')
cnt = "{:,}".format(cur.fetchone()[0])
app.cache['maxnum'] = cnt
return render_template('index.html', site=site, cnt=cnt ,capcha_site=capcha_site)
except:
logger.error(traceback.format_exc())
return 'bad request!', 500
@app.route("/privacy.html")
def privacy_html():
try:
return render_template('privacy.html', site=site)
except:
logger.error(traceback.format_exc())
return 'bad request!', 500
@app.route("/robots.txt")
def robot_txt():
try:
return render_template('robots.txt', site=site)
except:
logger.error(traceback.format_exc())
return 'bad request!', 500
@app.route("/decode", methods = ['POST'])
def decode():
try:
vin = request.form.get('q').strip()
g_respone = request.form['g-recaptcha-response']
capcha_check = requests.post(url=f'{capcha_site_url}?secret={capcha_site_sec}&response={g_respone}').json()
if capcha_check['success'] == False or capcha_check['score'] <capcha_score:
logger.info(f'Google reuest: {capcha_site_url}?secret={capcha_site_sec}&response={g_respone}')
logger.info(f'Bad google answer: {capcha_check}')
abort(401)
return redirect(f'/detail/{vin}.html', 301)
except:
logger.error(traceback.format_exc())
return 'bad request!', 500
@app.route("/decodevin.html")
def decodevin_html():
try:
return render_template('decodevin.html', site=site,capcha_site=capcha_site)
except:
logger.error(traceback.format_exc())
return 'bad request!', 500
@app.route("/database/page<int:page_num>.html")
def database_page_prc(page_num):
try:
pg = int(page_num)
conn = pool.acquire()
cur = conn.cursor()
user_ip = get_ip(request) ## определение ip клиента
try:
returnVal = cur.callfunc("checkip", int, [user_ip, request.headers.get("CF-IPCountry", 'None'), get_addr(user_ip), 1, 0, 0])
except:
print(request)
logger.error(traceback.format_exc())
cur.execute('select max(pg) from mv_pages')
max_page = int(cur.fetchall()[0][0])
cur.execute("""select rownum,s.vin,
COALESCE ((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='26'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Make'),'UNKNOWN') make,
COALESCE ((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='28'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Model'),'UNKNOWN') model,
COALESCE ((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='29'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Model Year'),'UNKNOWN') year
from salvagedb s
where num between (select mi from mv_pages where pg = :p1) and (select ma from mv_pages where pg = :p1)""",
{'p1': pg})
pgf = cur.fetchall()
return render_template('database.html', site=site, cur_page=pg, max_page=max_page, pg=pgf)
except:
logger.error(traceback.format_exc())
return 'bad request!', 500
@app.route("/detail/<vin>.html")
def detail_vin(vin):
try:
conn = pool.acquire()
cur = conn.cursor()
user_ip = get_ip(request) ## определение ip клиента
try:
returnVal = cur.callfunc("checkip", int, [user_ip, request.headers.get("CF-IPCountry", 'None'), get_addr(user_ip), 0, 0, 1])
if returnVal == 1:
return redirect('/rate_limit.html', 301)
except Exception as e:
print(request)
logger.error(traceback.format_exc())
cur.execute("""select 'None', COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='26'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Make'),'UNKNOWN') make,
COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='28'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Model'),'UNKNOWN') model,
COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='29'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Model Year'),'UNKNOWN') year,
COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='5'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Body Class'),'UNKNOWN') body,
COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='13'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Engine Model'),'UNKNOWN') engine,
COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='9'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Engine Number of Cylinders'),'UNKNOWN') celindr,
COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='15'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Drive Type'),'-') drive
from (select substr(:p1,1,10) svin, :p1 vin from dual) s """, {'p1': vin})
res = cur.fetchall()
return render_template('details.html', site=site, vin=vin, det=res, capcha_site=capcha_site)
except:
logger.error(traceback.format_exc())
return 'bad request!', 500
@app.route('/search', methods = ['POST'])
def search():
try:
user_ip = get_ip(request) ## определение ip клиента
vin = request.form.get('q')
ua = request.headers.get('User-Agent')
logger.info(f'AgeNt: {ua}')
g_respone = request.form.get('g-recaptcha-response')
capcha_check = requests.post(url=f'{capcha_site_url}?secret={capcha_site_sec}&response={g_respone}').json()
if capcha_check['success'] == False or capcha_check['score'] <capcha_score:
logger.info(f'Google reuest: {capcha_site_url}?secret={capcha_site_sec}&response={g_respone}')
logger.info(f'Bad google answer: {capcha_check}')
if app_debug==True:
req_data = save_request(request)
logger.info(json.dumps(req_data, indent=4, default=str))
return 'google recaptcha req low score', 401
if len(vin) != 17:
return 'bad vin!', 500
conn = pool.acquire()
cur = conn.cursor()
returnVal = cur.callfunc("checkip", int, [user_ip, request.headers.get("CF-IPCountry", 'None'), get_addr(user_ip), 0, 1, 0])
if returnVal == 1:
return redirect('/rate_limit.html', 301)
cur.execute("""select 'None', COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='26'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Make'),'UNKNOWN') make,
COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='28'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Model'),'UNKNOWN') model,
COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='29'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Model Year'),'UNKNOWN') year,
COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='5'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Body Class'),'UNKNOWN') body,
COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='13'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Engine Model'),'UNKNOWN') engine,
COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='9'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Engine Number of Cylinders'),'UNKNOWN') celindr,
COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='15'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Drive Type'),'-') drive
from (select substr(:p1,1,10) svin, :p1 vin from dual) s """, {'p1': vin})
res = cur.fetchall()
cur.execute("""select rownum, t.vin, t.title, t.odo, t.odos, t.dem1, t.dem2, t.year||'/'||t.month from salvagedb t where vin = :p1 and svin = substr(:p1,1,10) """, {'p1': vin})
his = cur.fetchall()
# Сохраняем VIN в сессии
session['last_searched_vin'] = vin
session['last_search_time'] = datetime.datetime.now().timestamp()
return render_template('search.html', site=site, vin=vin, det=res, his=his)
except:
logger.error(traceback.format_exc())
return 'bad request!', 500
@app.route("/rate_limit.html")
def rate_limit_html():
return render_template('rate_limit.html', site=site)
## API
@app.route("/api/search")
def api_search():
try:
access_code = request.args.get('access_code', None)
vin = request.args.get('vin', None)
user_ip = get_ip(request) ## определение ip клиента
# базовые проверки входящих аргументов
if len(vin) != 17 or vin is None:
ret = {'status': 'incorrect vin'}
response = app.response_class(
response=json.dumps(ret),
status=200,
mimetype='application/json'
)
if app.debug:
logger.debug(json.dumps(ret))
return response
if len(access_code) > 16 or access_code is None:
ret = {'status': 'incorrect access_code'}
response = app.response_class(
response=json.dumps(ret),
status=200,
mimetype='application/json'
)
if app.debug:
logger.debug(json.dumps(ret))
return response
conn = pool.acquire()
cur = conn.cursor()
## проверяем access_code
cur.execute(
'select t.summ, t.found_price, t.notfound_price, t.decode_price from restfact t where access_code = :p1',
{'p1': str(access_code)})
res = cur.fetchone()
summ = res[0]
found_price = res[1]
notfound_price = res[2]
decode_price = res[3]
### не достаточно средств
if summ <= 0:
ret = {'status': 'You have insufficient balance.'}
response = app.response_class(
response=json.dumps(ret),
status=200,
mimetype='application/json'
)
if app.debug:
logger.debug(json.dumps(ret))
return response
## ищем в истории
cur.execute('select t.odo,t.odos,t.title,t.dem1,t.dem2,t.year,t.month from salvagedb t where vin =:p1 and svin = :p2',
{'p1': vin.upper(), 'p2': vin.upper()[:10]})
res = cur.fetchall()
if len(res) > 0:
# found
dat = []
for it in res:
dat.append({
'odometer': it[0],
'odometer_status': it[1],
'title': it[2],
'damage1': it[3],
'damage2': it[4],
'add_to_db': '{}-{}'.format(it[5], it[6])
})
ret = {
'status': 'found',
'vin': vin.upper(),
'cost': found_price,
'records': dat
}
response = app.response_class(
response=json.dumps(ret),
status=200,
mimetype='application/json'
)
if app.debug:
logger.debug(json.dumps(ret))
cur.execute("insert into billing(access_code, vin, ip, status, dt, cost) values (:p1, :p2, :p3, :p4, CAST(SYSTIMESTAMP AT TIME ZONE 'UTC' AS DATE), :p5)",
{'p1': str(access_code), 'p2': str(vin), 'p3': str(user_ip), 'p4': 'FOUND', 'p5': found_price})
conn.commit()
return response
else:
# nor found
ret = {'status': 'NOT FOUND', 'cost': notfound_price}
if app.debug:
logger.debug(json.dumps(ret))
response = app.response_class(
response=json.dumps(ret),
status=200,
mimetype='application/json'
)
cur.execute("insert into billing(access_code, vin, ip, status, dt, cost) values (:p1, :p2, :p3, :p4, CAST(SYSTIMESTAMP AT TIME ZONE 'UTC' AS DATE), :p5)",
{'p1': str(access_code), 'p2': str(vin), 'p3': str(user_ip), 'p4': 'NOT FOUND', 'p5': notfound_price})
conn.commit()
return response
except:
logger.error(traceback.format_exc())
return 'bad request!', 500
@app.route("/api/restfact")
def api_restfact():
access_code = request.args.get('access_code', None)
if len(access_code) > 16 or access_code is None:
ret = {'status': 'incorrect access_code'}
response = app.response_class(
response=json.dumps(ret),
status=200,
mimetype='application/json'
)
if app.debug:
logger.debug(json.dumps(ret))
return response
conn = pool.acquire()
cur = conn.cursor()
cur.callproc('make_billing')
cur.execute(
'select t.summ, t.found_price, t.notfound_price, t.decode_price from restfact t where access_code = :p1',
{'p1': str(access_code)})
res = cur.fetchone()
summ = res[0]
if len(res) <= 0:
ret = {'status': 'incorrect access_code'}
response = app.response_class(
response=json.dumps(ret),
status=200,
mimetype='application/json'
)
if app.debug:
logger.debug(json.dumps(ret))
return response
else:
ret = {
'access_code': access_code,
'restfact': summ
}
response = app.response_class(
response=json.dumps(ret),
status=200,
mimetype='application/json'
)
if app.debug:
logger.debug(json.dumps(ret))
return response
@app.route("/api/restfact_detail")
def restfact_detail():
access_code = request.args.get('access_code', None)
if len(access_code) > 16 or access_code is None:
ret = {'status': 'incorrect access_code'}
response = app.response_class(
response=json.dumps(ret),
status=200,
mimetype='application/json'
)
if app.debug:
logger.debug(json.dumps(ret))
return response
conn = pool.acquire()
cur = conn.cursor()
cur.callproc('make_billing')
cur.execute('select rownum from restfact t where access_code = :p1', {'p1': str(access_code)})
res = cur.fetchall()
if len(res) <= 0:
ret = {'status': 'incorrect access_code'}
response = app.response_class(
response=json.dumps(ret),
status=200,
mimetype='application/json'
)
if app.debug:
logger.debug(json.dumps(ret))
return response
cur.execute("select vin, ip, status, dt, cost from billing where access_code = :p1 and dt > CAST(SYSTIMESTAMP AT TIME ZONE 'UTC' AS DATE) - 45 and made = 1 order by id desc", {'p1': str(access_code)})
res = cur.fetchall()
if len(res) <= 0:
ret = {'status': 'billing not found'}
response = app.response_class(
response=json.dumps(ret),
status=200,
mimetype='application/json'
)
if app.debug:
logger.debug(json.dumps(ret))
return response
else:
dat = []
for it in res:
dat.append(
{
'VIN': it[0],
'IP': it[1],
'STATUS': it[2],
'DATETIME': it[3].isoformat(),
'COST': it[4]
}
)
ret = {
'access_code': access_code,
'report': dat
}
response = app.response_class(
response=json.dumps(ret),
status=200,
mimetype='application/json'
)
if app.debug:
logger.debug(json.dumps(ret))
return response
## API V2
@app.route("/api/v2/search")
def api_search_v2():
try:
access_code = request.args.get('access_code', None)
vin = request.args.get('vin', None)
user_ip = get_ip(request) ## определение ip клиента
# базовые проверки входящих аргументов
if len(vin) != 17 or vin is None:
ret = {'status': 'incorrect vin'}
response = app.response_class(
response=json.dumps(ret),
status=200,
mimetype='application/json'
)
if app.debug:
logger.debug(json.dumps(ret))
return response
if len(access_code) > 16 or access_code is None:
ret = {'status': 'incorrect access_code'}
response = app.response_class(
response=json.dumps(ret),
status=200,
mimetype='application/json'
)
if app.debug:
logger.debug(json.dumps(ret))
return response
conn = pool.acquire()
cur = conn.cursor()
## проверяем access_code
cur.execute(
'select t.summ, t.found_price, t.notfound_price, t.decode_price from restfact t where access_code = :p1',
{'p1': str(access_code)})
res = cur.fetchone()
summ = res[0]
found_price = res[1]
notfound_price = res[2]
decode_price = res[3]
### не достаточно средств
if summ <= 0:
ret = {'status': 'You have insufficient balance.'}
response = app.response_class(
response=json.dumps(ret),
status=200,
mimetype='application/json'
)
if app.debug:
logger.debug(json.dumps(ret))
return response
## ищем в истории
cur.execute('''select s.odo,s.odos,s.title,s.dem1,s.dem2,s.year,s.month, json_value(i.jdata, '$.Runs_Drive') RD_Status, json_value(i.jdata, '$.Locate') Sale_Location, json_value(i.jdata, '$.RepCost') as Repear_cost,
(select count(1) from salvage_images si where si.vin = s.vin and fn = 1) image_count from salvagedb.salvagedb s left join addinfo i on s.num = i.numid where vin =:p1 and svin = :p2''',
{'p1': vin.upper(), 'p2': vin.upper()[:10]})
res = cur.fetchall()
if len(res) > 0:
# found
dat = []
for it in res:
dat.append({
'odometer': it[0],
'odometer_status': it[1],
'title': it[2],
'damage1': it[3],
'damage2': it[4],
'add_to_db': '{}-{}'.format(it[5], it[6]),
'RD_Status':it[7],
'Sale_Location':it[8],
'Repear_Cost':it[9],
'Photo_Count':it[10]
})
ret = {
'status': 'found',
'vin': vin.upper(),
'cost': found_price,
'records': dat
}
response = app.response_class(
response=json.dumps(ret),
status=200,
mimetype='application/json'
)
if app.debug:
logger.debug(json.dumps(ret))
cur.execute("insert into billing(access_code, vin, ip, status, dt, cost) values (:p1, :p2, :p3, :p4, CAST(SYSTIMESTAMP AT TIME ZONE 'UTC' AS DATE), :p5)",
{'p1': str(access_code), 'p2': str(vin), 'p3': str(user_ip), 'p4': 'FOUND', 'p5': found_price})
conn.commit()
return response
else:
# nor found
ret = {'status': 'NOT FOUND', 'cost': notfound_price}
if app.debug:
logger.debug(json.dumps(ret))
response = app.response_class(
response=json.dumps(ret),
status=200,
mimetype='application/json'
)
cur.execute("insert into billing(access_code, vin, ip, status, dt, cost) values (:p1, :p2, :p3, :p4, CAST(SYSTIMESTAMP AT TIME ZONE 'UTC' AS DATE), :p5)",
{'p1': str(access_code), 'p2': str(vin), 'p3': str(user_ip), 'p4': 'NOT FOUND', 'p5': notfound_price})
conn.commit()
return response
except:
logger.error(traceback.format_exc())
return 'bad request!', 500
@app.route("/api/v2/reqphoto")
def api_reqimage():
try:
access_code = request.args.get('access_code', None)
vin = request.args.get('vin', None)
# user_ip = get_ip(request) ## определение ip клиента
# базовые проверки входящих аргументов
if len(vin) != 17 or vin is None:
ret = {'status': 'incorrect vin'}
response = app.response_class(
response=json.dumps(ret),
status=200,
mimetype='application/json'
)
if app.debug:
logger.debug(json.dumps(ret))
return response
if len(access_code) > 16 or access_code is None:
ret = {'status': 'incorrect access_code'}
response = app.response_class(
response=json.dumps(ret),
status=200,
mimetype='application/json'
)
if app.debug:
logger.debug(json.dumps(ret))
return response
conn = pool.acquire()
cur = conn.cursor()
## проверяем access_code
cur.execute(
'select t.summ, t.found_price, t.notfound_price, t.decode_price from restfact t where access_code = :p1',
{'p1': str(access_code)})
res = cur.fetchone()
summ = res[0]
found_price = res[1]
notfound_price = res[2]
decode_price = res[3]
if summ <= 0:
ret = {'status': 'You have insufficient balance.'}
response = app.response_class(
response=json.dumps(ret),
status=200,
mimetype='application/json'
)
if app.debug:
logger.debug(json.dumps(ret))
return response
cur.execute('select count(*) from salvagedb.salvage_images where vin = :p1 and fn = 1' , {'p1':vin.upper()})
res = cur.fetchone()
img_count = int(res[0])
if img_count<1:
ret = {'status': 'Photos not found for this vin.'}
response = app.response_class(
response=json.dumps(ret),
status=200,
mimetype='application/json'
)
if app.debug:
logger.debug(json.dumps(ret))
return response
else:
cur.execute("""select GenImages('{}','{}') as ui from dual""".format(access_code, vin.upper()))
res = cur.fetchall()
req_id = res[0][0]
cur.execute('select fake_name from salvage_images_req where req_id = :p1', {'p1':req_id})
res = cur.fetchall()
images = []
nm = 0
for it in res:
images.append({
'num':nm,
'url':'https://salvagedb.com/simages/{}/{}'.format(req_id, it[0])
})
nm = nm + 1
ret = {'status': 'Image_found', 'Images':images}
response = app.response_class(
response=json.dumps(ret),
status=200,
mimetype='application/json'
)
return response
except:
logger.error(traceback.format_exc())
return 'bad request!', 500
@app.route("/ads.txt")
def ads_txt():
try:
return render_template('ads.txt')
except:
logger.error(traceback.format_exc())
return 'bad request!', 500
@app.route('/favicon.ico')
def logo():
return send_file(app_path+"/static/favicon.ico")
@app.route('/limit')
def rate_limit():
try:
return render_template('rate_limit.html', site=site)
except:
logger.error(traceback.format_exc())
return 'bad request!', 500
@app.route('/donate')
def donate():
return render_template('donate.html')
def get_ip(req) -> str:
if 'X-Forwarded-For' in req.headers:
proxy_data = req.headers['X-Forwarded-For']
ip_list = proxy_data.split(',')
user_ip = req.headers.get("Cf-Connecting-Ip") or ip_list[0] # first address in list is User IP
else:
user_ip = req.headers.get("Cf-Connecting-Ip") or req.remote_addr
return user_ip
def get_addr(req) -> str:
try:
return socket.gethostbyaddr(req)[0]
except:
return req
@app.route('/api/swagger.yaml')
def swagger_yaml():
return send_from_directory('api', 'swagger.yaml')
@app.route('/static/<path:filename>')
def serve_static(filename):
try:
# Проверка расширения файла
allowed_extensions = {'.css', '.js', '.png', '.jpg', '.jpeg', '.gif', '.ico', '.svg', '.json'}
file_ext = os.path.splitext(filename)[1].lower()
if file_ext not in allowed_extensions:
logger.warning(f'Attempt to access forbidden file type: {filename}')
return 'Access denied', 403
# Проверка пути на directory traversal
safe_path = os.path.normpath(os.path.join('static', filename))
if not safe_path.startswith('static'):
logger.warning(f'Attempt to access file outside static directory: {filename}')
return 'Access denied', 403
# Определение MIME-типа
mime_types = {
'.css': 'text/css',
'.js': 'application/javascript',
'.json': 'application/json',
'.png': 'image/png',
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.gif': 'image/gif',
'.ico': 'image/x-icon',
'.svg': 'image/svg+xml'
}
mime_type = mime_types.get(file_ext, 'application/octet-stream')
# Логирование доступа
logger.info(f'Access to static file: {filename}')
response = make_response(send_from_directory('static', filename))
response.headers['Content-Type'] = mime_type
# Специальные заголовки для PWA файлов
if filename == 'manifest.json':
response.headers['Content-Type'] = 'application/manifest+json'
elif filename == 'sw.js':
response.headers['Service-Worker-Allowed'] = '/'
return response
except Exception as e:
logger.error(f'Error accessing file {filename}: {str(e)}')
return 'File not found', 404
@app.route("/salvagereport/<string:vin>")
def generate_pdf_report(vin):
try:
# Проверяем наличие VIN в сессии и время последнего поиска
if 'last_searched_vin' not in session or session['last_searched_vin'] != vin:
logger.warning(f'Direct access attempt to report generation for VIN: {vin}')
return 'Access denied', 403
# Проверяем время последнего поиска (не более 5 минут)
if datetime.datetime.now().timestamp() - session['last_search_time'] > 300:
logger.warning(f'Report generation attempt expired for VIN: {vin}')
return 'Access denied', 403
conn = pool.acquire()
cur = conn.cursor()
user_ip = get_ip(request)
try:
returnVal = cur.callfunc("checkip", int, [user_ip, request.headers.get("CF-IPCountry", 'None'), get_addr(user_ip), 1, 0, 0])
except:
logger.error(traceback.format_exc())
# Get vehicle details
cur.execute("""select 'None', COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='26'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Make'),'UNKNOWN') make,
COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='28'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Model'),'UNKNOWN') model,
COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='29'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Model Year'),'UNKNOWN') year,
COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='5'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Body Class'),'UNKNOWN') body,
COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='13'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Engine Model'),'UNKNOWN') engine,
COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='9'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Engine Number of Cylinders'),'UNKNOWN') celindr,
COALESCE((select value from m_JSONS_FROM_NHTSA v3 where v3.svin =s.svin and v3.variableid ='15'),(select val from vind2 where svin = substr(s.vin, 1, 8) || '*' || substr(s.vin, 10, 2) and varb = 'Drive Type'),'-') drive
from (select substr(:p1,1,10) svin, :p1 vin from dual) s """, {'p1': vin})
det = cur.fetchall()
# Get salvage history
cur.execute("""select rownum, t.vin, t.title, t.odo, t.odos, t.dem1, t.dem2, t.year||'/'||t.month from salvagedb t where vin = :p1 and svin = substr(:p1,1,10) """, {'p1': vin})
his = cur.fetchall()
# Generate current date and report ID
now = datetime.datetime.now()
report_date = now.strftime("%Y-%m-%d") # Только дата без времени
report_id = uuid.uuid4().hex[:8].upper()
current_year = now.year
try:
# Создаем буфер для PDF
buffer = io.BytesIO()
# Создаем PDF документ
doc = SimpleDocTemplate(
buffer,
pagesize=A4,
rightMargin=20*mm,
leftMargin=20*mm,
topMargin=15*mm,
bottomMargin=20*mm
)
# Создаем стили для текста
styles = getSampleStyleSheet()
styles.add(ParagraphStyle(
name='ReportTitle',
parent=styles['Heading1'],
fontSize=16,
alignment=TA_CENTER
))
styles.add(ParagraphStyle(
name='ReportSubtitle',
parent=styles['Normal'],
fontSize=12,
alignment=TA_CENTER,
textColor=colors.HexColor('#0066CC') # Синий цвет для акцента
))
styles.add(ParagraphStyle(
name='Right',
parent=styles['Normal'],
fontSize=10,
alignment=TA_RIGHT
))
styles.add(ParagraphStyle(
name='Disclaimer',
parent=styles['Normal'],
fontSize=8,
textColor=colors.gray
))
styles.add(ParagraphStyle(
name='SalvageDBFooter',
parent=styles['Normal'],
fontSize=9,
alignment=TA_CENTER,
textColor=colors.HexColor('#0066CC'), # Синий цвет для акцента
fontName='Helvetica-Bold'
))
# Список элементов для PDF
elements = []
# Путь к логотипу
# logo_path = os.path.join(app_path, 'static', 'icons', 'icon-144x144.png')
# Добавляем логотип, если файл существует
# if os.path.exists(logo_path):
# logo_img = Image(logo_path, width=30*mm, height=30*mm)
# logo_img.hAlign = 'CENTER'
# elements.append(logo_img)
# elements.append(Spacer(1, 5*mm))
# Заголовок отчета с акцентом на SALVAGEDB.COM
elements.append(Paragraph("Vehicle History Report", styles['ReportTitle']))
elements.append(Paragraph("SALVAGEDB.COM - Comprehensive Vehicle History Check", styles['ReportSubtitle']))
elements.append(Spacer(1, 10*mm))
# Информация об отчете
elements.append(Paragraph(f"Report Date: {report_date}", styles['Right']))
elements.append(Paragraph(f"Report ID: {report_id}", styles['Right']))
elements.append(Spacer(1, 15*mm))
# Информация о транспортном средстве
elements.append(Paragraph("Vehicle Information", styles['Heading2']))
# Создаем таблицу с информацией о транспортном средстве
vehicle_data = [
["VIN", vin],
["Make", det[0][1]],
["Model", det[0][2]],
["Year", det[0][3]],
["Body Style", det[0][4]],
["Engine", det[0][5]],
["Cylinders", det[0][6]],
["Drive", det[0][7]]
]
vehicle_table = Table(vehicle_data, colWidths=[100, 350])
vehicle_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (0, -1), colors.lightgrey),
('TEXTCOLOR', (0, 0), (0, -1), colors.black),
('ALIGN', (0, 0), (0, -1), 'LEFT'),
('FONTNAME', (0, 0), (0, -1), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (0, -1), 10),
('BOTTOMPADDING', (0, 0), (-1, -1), 6),
('BACKGROUND', (1, 0), (-1, -1), colors.white),
('GRID', (0, 0), (-1, -1), 0.5, colors.black),
]))
elements.append(vehicle_table)
elements.append(Spacer(1, 10*mm))
# Информация об истории
if his:
elements.append(Paragraph("Salvage History", styles['Heading2']))
# Создаем стиль для ячеек с переносом текста
table_style = TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.black),
('ALIGN', (0, 0), (-1, 0), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 9),
('BOTTOMPADDING', (0, 0), (-1, 0), 6),
('BACKGROUND', (0, 1), (-1, -1), colors.white),
('GRID', (0, 0), (-1, -1), 0.5, colors.black),
('VALIGN', (0, 0), (-1, -1), 'TOP'),
('LEFTPADDING', (0, 0), (-1, -1), 3),
('RIGHTPADDING', (0, 0), (-1, -1), 3),
('FONTSIZE', (0, 1), (-1, -1), 8),
('WORDWRAP', (0, 0), (-1, -1), True),
])
# Оптимизированные заголовки без VIN
history_headers = ["Title", "Mileage", "Odometer Status", "Primary\nDamage", "Secondary\nDamage"]
# Формируем данные для таблицы истории
history_data = [history_headers]
for item in his:
title_value = item[2]
primary_damage = item[5] if item[5] else "N/A"
secondary_damage = item[6] if item[6] else "N/A"
# Добавляем строку с данными, убрав VIN
history_data.append([
title_value,
str(item[3]),
item[4],
primary_damage,
secondary_damage
])
# Устанавливаем ширину колонок оптимально для A4
available_width = doc.width # Доступная ширина страницы
col_widths = [
available_width * 0.20, # 20% для Title
available_width * 0.12, # 12% для Mileage
available_width * 0.18, # 18% для Odometer Status
available_width * 0.25, # 25% для Primary Damage
available_width * 0.25 # 25% для Secondary Damage
]
# Создаем таблицу с правильными размерами
history_table = Table(history_data, colWidths=col_widths, repeatRows=1)
history_table.setStyle(table_style)
elements.append(history_table)
else:
elements.append(Paragraph("Salvage History", styles['Heading2']))
elements.append(Paragraph("Salvage history not found.", styles['Normal']))
elements.append(Spacer(1, 15*mm))
# Добавляем штамп поверх таблицы
# stamp_path = os.path.join(app_path, 'static', 'stamp256.png')
# if os.path.exists(stamp_path):
# stamp_img = Image(stamp_path, width=100, height=100)
# stamp_img.hAlign = 'RIGHT'
# stamp_img.vAlign = 'TOP'
# # Размещаем штамп поверх таблицы
# elements.append(stamp_img)
# elements.append(Spacer(1, 15*mm))
# Дисклеймер
disclaimer_text = """
Disclaimer: This report provides information about salvage or junk vehicles; damage from hail, flood or fire;
mileage discrepancies or odometer rollback; and gray market vehicles. We do not claim that the car got in our
databank has salvage title, but the fact that it has been damaged for sure. Our site helps people avoid buying
a damaged vehicle in the past.
"""
elements.append(Paragraph(disclaimer_text, styles['Disclaimer']))
# Футер с акцентом на SALVAGEDB.COM
elements.append(Spacer(1, 15*mm))
elements.append(Paragraph(f"© {current_year} SALVAGEDB.COM - All Rights Reserved", styles['SalvageDBFooter']))
elements.append(Paragraph("Visit SALVAGEDB.COM for more information about vehicle history", styles['SalvageDBFooter']))
elements.append(Paragraph('"This report is provided as is without any guarantees or warranty."', styles['Disclaimer']))
# Строим PDF документ
# doc.build(elements, onFirstPage=add_stamp_overlay, onLaterPages=add_stamp_overlay)
doc.build(elements, canvasmaker=OverlayCanvas)
# Получаем содержимое буфера
pdf_data = buffer.getvalue()
buffer.close()
# Создаем HTTP-ответ с PDF
response = make_response(pdf_data)
response.headers['Content-Type'] = 'application/pdf'
response.headers['Content-Disposition'] = f'attachment; filename=vehicle_report_{vin}.pdf'
return response
except Exception as pdf_error:
# Если возникла ошибка при генерации PDF, логируем ее
app.logger.error(f"PDF generation error: {str(pdf_error)}")
app.logger.error(traceback.format_exc())
# Показываем печатную версию в браузере
return render_template(
'report_printable.html',
vin=vin,
det=det,
his=his,
report_date=report_date,
report_id=report_id,
current_year=current_year,
error_message=str(pdf_error)
)
except Exception as e:
app.logger.error(traceback.format_exc())
return 'Report generation failed: ' + str(e), 500
if __name__ == '__main__':
# Start a pool of connections
pool = start_pool()
app.run(port=int(os.environ.get('PORT', '8080')))