Security hardening: env SECRET_KEY, rate limiting, upload validation, Stripe fixes

- SECRET_KEY desde variable de entorno (warn si no configurado)
- Rutas absolutas para DB, logos y PDFs
- MAX_CONTENT_LENGTH = 16 MB
- Validación de extensión/tipo en subida de logos y firmas
- _doc_list_page: clientes y productos filtrados por empresa del usuario
- login: rate limiting (10 intentos / 15 min por IP)
- load_user: db.session.get() (SQLAlchemy 2.x compatible)
- pay_success: verifica sesión Stripe antes de marcar factura como pagada
- stripe_checkout: api_key por llamada (thread-safe, elimina global mutable)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-05 02:13:59 -04:00
parent 35d460b127
commit 764b72a318
+76 -15
View File
@@ -4,7 +4,7 @@ from flask_login import LoginManager, UserMixin, login_user, logout_user, login_
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.utils import secure_filename
from datetime import datetime
import os, json, smtplib, re, secrets
import os, json, smtplib, re, secrets, time
try:
import requests as http_requests
HAS_REQUESTS = True
@@ -40,13 +40,44 @@ from email.mime.text import MIMEText
from email import encoders
app = Flask(__name__)
app.config['SECRET_KEY'] = 'marineinvoice-secret-key-2024'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///marineinvoice.db'
# ── Security: SECRET_KEY desde variable de entorno ───────────────────────────
_secret_key = os.environ.get('SECRET_KEY')
if not _secret_key:
_secret_key = 'marineinvoice-secret-key-2024'
print('⚠️ WARNING: SECRET_KEY no configurado en variables de entorno. '
'Crea un archivo .env con SECRET_KEY=<clave aleatoria> antes de producción.')
app.config['SECRET_KEY'] = _secret_key
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{os.path.join(BASE_DIR, "instance", "marineinvoice.db")}'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['UPLOAD_FOLDER'] = 'static/logos'
app.config['PDF_FOLDER'] = 'static/pdfs'
os.makedirs('static/logos', exist_ok=True)
os.makedirs('static/pdfs', exist_ok=True)
app.config['UPLOAD_FOLDER'] = os.path.join(BASE_DIR, 'static', 'logos')
app.config['PDF_FOLDER'] = os.path.join(BASE_DIR, 'static', 'pdfs')
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16 MB máximo por request
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
os.makedirs(app.config['PDF_FOLDER'], exist_ok=True)
os.makedirs(os.path.join(BASE_DIR, 'instance'), exist_ok=True)
# Extensiones permitidas en subidas de archivos
ALLOWED_IMG_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
def _allowed_image(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_IMG_EXTENSIONS
# ── Rate-limiting simple para login (sin dependencias externas) ──────────────
_login_attempts: dict = {} # ip -> [timestamps]
_LOGIN_MAX = 10 # intentos máximos
_LOGIN_WINDOW = 900 # ventana de 15 minutos
def _is_rate_limited(ip: str) -> bool:
now = time.time()
attempts = [t for t in _login_attempts.get(ip, []) if now - t < _LOGIN_WINDOW]
_login_attempts[ip] = attempts
return len(attempts) >= _LOGIN_MAX
def _record_failed_login(ip: str):
_login_attempts.setdefault(ip, []).append(time.time())
db = SQLAlchemy(app)
login_manager = LoginManager(app)
@@ -203,7 +234,7 @@ class Document(db.Model):
creator = db.relationship('User', backref='documents', lazy=True, foreign_keys=[created_by])
@login_manager.user_loader
def load_user(user_id): return User.query.get(int(user_id))
def load_user(user_id): return db.session.get(User, int(user_id))
# ============================================================
# AUTH
@@ -212,12 +243,17 @@ def load_user(user_id): return User.query.get(int(user_id))
def login():
if current_user.is_authenticated: return redirect(url_for('dashboard'))
if request.method == 'POST':
ip = request.remote_addr or '0.0.0.0'
if _is_rate_limited(ip):
flash('Demasiados intentos fallidos. Espera 15 minutos.', 'error')
return render_template('login.html')
u = request.form.get('username','').strip()
p = request.form.get('password','')
user = User.query.filter_by(username=u, active=True).first()
if user and user.check_password(p):
login_user(user, remember=True)
return redirect(url_for('dashboard'))
_record_failed_login(ip)
flash('Usuario o contraseña incorrectos', 'error')
return render_template('login.html')
@@ -273,6 +309,9 @@ def new_company():
if 'logo' in request.files:
f = request.files['logo']
if f and f.filename:
if not _allowed_image(f.filename):
flash('Formato de imagen no permitido (usa PNG, JPG, GIF, WEBP)', 'error')
return render_template('company_form.html', company=None)
fn = secure_filename(f.filename)
f.save(os.path.join(app.config['UPLOAD_FOLDER'], fn))
logo_path = f'logos/{fn}'
@@ -313,12 +352,18 @@ def edit_company(id):
if 'logo' in request.files:
f = request.files['logo']
if f and f.filename:
if not _allowed_image(f.filename):
flash('Formato de imagen no permitido (usa PNG, JPG, GIF, WEBP)', 'error')
return render_template('company_form.html', company=c)
fn = secure_filename(f.filename)
f.save(os.path.join(app.config['UPLOAD_FOLDER'], fn))
c.logo_path = f'logos/{fn}'
if 'signature' in request.files:
f = request.files['signature']
if f and f.filename:
if not _allowed_image(f.filename):
flash('Formato de imagen no permitido (usa PNG, JPG, GIF, WEBP)', 'error')
return render_template('company_form.html', company=c)
fn = 'sig_' + secure_filename(f.filename)
f.save(os.path.join(app.config['UPLOAD_FOLDER'], fn))
c.signature_path = f'logos/{fn}'
@@ -542,8 +587,12 @@ def _doc_list_page(doc_type):
else:
docs = Document.query.filter_by(doc_type=doc_type, company_id=current_user.company_id).order_by(Document.created_at.desc()).all()
companies = Company.query.filter_by(active=True).all()
clients = Client.query.filter_by(active=True).all()
products = Product.query.filter_by(active=True).all()
if current_user.is_superadmin():
clients = Client.query.filter_by(active=True).all()
products = Product.query.filter_by(active=True).all()
else:
clients = Client.query.filter_by(company_id=current_user.company_id, active=True).all()
products = Product.query.filter_by(company_id=current_user.company_id, active=True).all()
return render_template('documents.html', docs=docs, doc_type=doc_type,
companies=companies, clients=clients, products=products)
@@ -962,11 +1011,11 @@ def stripe_checkout(token):
company = Company.query.get(doc.company_id)
if not company or not company.stripe_secret_key:
return 'Stripe no configurado', 400
stripe.api_key = company.stripe_secret_key
invoice_cents = int(round(doc.total * 100))
fee_cents = int(round((doc.total * 0.029 + 0.30) * 100))
base = get_public_base_url()
try:
# Pasar api_key por llamada en lugar de asignar el global (thread-safe)
session = stripe.checkout.Session.create(
payment_method_types=['card'],
line_items=[
@@ -990,7 +1039,8 @@ def stripe_checkout(token):
mode='payment',
success_url=f'{base}/pay/{token}/success?session_id={{CHECKOUT_SESSION_ID}}',
cancel_url=f'{base}/pay/{token}',
metadata={'invoice_id': doc.id, 'token': token}
metadata={'invoice_id': doc.id, 'token': token},
api_key=company.stripe_secret_key, # per-request, thread-safe
)
return redirect(session.url)
except Exception as e:
@@ -999,10 +1049,21 @@ def stripe_checkout(token):
@app.route('/pay/<token>/success')
def pay_success(token):
doc = Document.query.filter_by(payment_token=token).first_or_404()
doc.status = 'paid'
db.session.commit()
company = Company.query.get(doc.company_id)
return render_template('pay_success.html', doc=doc, company=company, already_paid=False)
session_id = request.args.get('session_id', '')
if doc.status != 'paid':
# Verificar con Stripe antes de marcar como pagado
if stripe and company and company.stripe_secret_key and session_id:
try:
sess = stripe.checkout.Session.retrieve(
session_id, api_key=company.stripe_secret_key)
if sess.payment_status == 'paid':
doc.status = 'paid'
db.session.commit()
except Exception:
pass # El webhook es el canal confiable; aquí solo confirmamos
already_paid = (doc.status == 'paid')
return render_template('pay_success.html', doc=doc, company=company, already_paid=already_paid)
@app.route('/stripe/webhook', methods=['POST'])
def stripe_webhook():