Security hardening: env SECRET_KEY, rate limiting, input validation, ownership checks

- SECRET_KEY desde variable de entorno (warn si no configurado)
- login: rate limiting (10 intentos / 15 min) + validación next param (open redirect fix)
- update_status: allowlist de estados válidos antes de ejecutar SQL
- purchase_update_status: allowlist contra PURCHASE_STATUSES
- save/clear_signature: allowlist _SIG_COLS para col derivado del request
- upload_invoice: validación de extensión contra ALLOWED_DOCS
- update_fields, update_labor, upload_photo, add_part_to_order: ownership check (empresa)
- update_status, save/clear_signature: ownership check en WO mutations
- auth.py: contraseña admin inicial desde ADMIN_PASSWORD env var

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-05 02:14:04 -04:00
parent 67a0e674ca
commit ab4c9c81b0
2 changed files with 96 additions and 10 deletions
+82 -6
View File
@@ -1,6 +1,7 @@
from flask import Flask, render_template, request, jsonify, redirect, url_for, send_file, session from flask import Flask, render_template, request, jsonify, redirect, url_for, send_file, session
import sqlite3, os, uuid import sqlite3, os, uuid, time
from datetime import datetime, date from datetime import datetime, date
from urllib.parse import urlparse
from werkzeug.utils import secure_filename from werkzeug.utils import secure_filename
from auth import (login_user, logout_user, current_user, is_logged_in, from auth import (login_user, logout_user, current_user, is_logged_in,
is_superadmin, is_admin, login_required, admin_required, is_superadmin, is_admin, login_required, admin_required,
@@ -8,7 +9,28 @@ from auth import (login_user, logout_user, current_user, is_logged_in,
create_initial_superadmin) create_initial_superadmin)
app = Flask(__name__) app = Flask(__name__)
app.secret_key = 'marine_maint_secret_2026_xK9p'
# ── Security: SECRET_KEY desde variable de entorno ───────────────────────────
_secret_key = os.environ.get('SECRET_KEY')
if not _secret_key:
_secret_key = 'marine_maint_secret_2026_xK9p'
print('⚠️ WARNING: SECRET_KEY no configurado en variables de entorno. '
'Crea un archivo .env con SECRET_KEY=<clave aleatoria> antes de producción.')
app.secret_key = _secret_key
# ── Rate-limiting simple para login (sin dependencias externas) ──────────────
_login_attempts: dict = {} # ip -> [timestamps]
_LOGIN_MAX = 10
_LOGIN_WINDOW = 900 # 15 minutos
def _is_rate_limited(ip: str) -> bool:
now = time.time()
attempts = [t for t in _login_attempts.get(ip, []) if now - t < _LOGIN_WINDOW]
_login_attempts[ip] = attempts
return len(attempts) >= _LOGIN_MAX
def _record_failed_login(ip: str):
_login_attempts.setdefault(ip, []).append(time.time())
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DB_PATH = os.path.join(BASE_DIR, 'marine_maintenance.db') DB_PATH = os.path.join(BASE_DIR, 'marine_maintenance.db')
@@ -268,6 +290,10 @@ def auth_login():
error = None error = None
username = '' username = ''
if request.method == 'POST': if request.method == 'POST':
ip = request.remote_addr or '0.0.0.0'
if _is_rate_limited(ip):
error = 'Demasiados intentos fallidos. Espera 15 minutos.'
return render_template('login.html', error=error, username='')
username = request.form.get('username','').strip() username = request.form.get('username','').strip()
password = request.form.get('password','') password = request.form.get('password','')
conn = get_db() conn = get_db()
@@ -279,7 +305,13 @@ def auth_login():
conn.close() conn.close()
if user and verify_password(password, user['password_hash']): if user and verify_password(password, user['password_hash']):
login_user(user) login_user(user)
return redirect(request.args.get('next') or url_for('dashboard')) # Validar next para prevenir open redirect
next_url = request.args.get('next') or url_for('dashboard')
parsed = urlparse(next_url)
if parsed.netloc and parsed.netloc != request.host:
next_url = url_for('dashboard')
return redirect(next_url)
_record_failed_login(ip)
error = 'Usuario o contraseña incorrectos.' error = 'Usuario o contraseña incorrectos.'
return render_template('login.html', error=error, username=username) return render_template('login.html', error=error, username=username)
@@ -814,11 +846,30 @@ def work_order_detail(woid):
vessel_owner_email=conn2_data.get('owner_email',''), vessel_owner_email=conn2_data.get('owner_email',''),
vessel_owner_name=conn2_data.get('owner_name','')) vessel_owner_name=conn2_data.get('owner_name',''))
_VALID_WO_STATUSES = {'pending', 'in_progress', 'completed', 'cancelled', 'on_hold'}
def _check_wo_access(conn, woid):
"""Verifica que el WO pertenece a la empresa del usuario actual."""
company_id = cid()
if company_id is None: # superadmin ve todo
return True
row = conn.execute("""
SELECT wo.id FROM work_orders wo
JOIN vessels v ON wo.vessel_id = v.id
WHERE wo.id=? AND v.company_id=?
""", (woid, company_id)).fetchone()
return row is not None
@app.route('/work-orders/<int:woid>/update-status', methods=['POST']) @app.route('/work-orders/<int:woid>/update-status', methods=['POST'])
@login_required @login_required
def update_status(woid): def update_status(woid):
status = request.json.get('status') status = request.json.get('status')
if status not in _VALID_WO_STATUSES:
return jsonify({'error': 'Estado inválido'}), 400
conn = get_db() conn = get_db()
if not _check_wo_access(conn, woid):
conn.close()
return jsonify({'error': 'No autorizado'}), 403
extra = ", end_date=date('now')" if status == 'completed' else "" extra = ", end_date=date('now')" if status == 'completed' else ""
conn.execute(f"UPDATE work_orders SET status=?,updated_at=CURRENT_TIMESTAMP{extra} WHERE id=?", (status, woid)) conn.execute(f"UPDATE work_orders SET status=?,updated_at=CURRENT_TIMESTAMP{extra} WHERE id=?", (status, woid))
conn.commit(); conn.close() conn.commit(); conn.close()
@@ -874,6 +925,8 @@ def update_status(woid):
def update_fields(woid): def update_fields(woid):
data = request.json data = request.json
conn = get_db() conn = get_db()
if not _check_wo_access(conn, woid):
conn.close(); return jsonify({'error': 'No autorizado'}), 403
conn.execute("""UPDATE work_orders SET root_cause=?,repairs_done=?, conn.execute("""UPDATE work_orders SET root_cause=?,repairs_done=?,
updated_at=CURRENT_TIMESTAMP WHERE id=?""", updated_at=CURRENT_TIMESTAMP WHERE id=?""",
(data.get('root_cause',''),data.get('repairs_done',''),woid)) (data.get('root_cause',''),data.get('repairs_done',''),woid))
@@ -885,6 +938,8 @@ def update_fields(woid):
def update_labor(woid): def update_labor(woid):
data = request.json data = request.json
conn = get_db() conn = get_db()
if not _check_wo_access(conn, woid):
conn.close(); return jsonify({'error': 'No autorizado'}), 403
conn.execute("""UPDATE work_orders SET labor_hours=?, labor_rate=?, conn.execute("""UPDATE work_orders SET labor_hours=?, labor_rate=?,
updated_at=CURRENT_TIMESTAMP WHERE id=?""", updated_at=CURRENT_TIMESTAMP WHERE id=?""",
(float(data.get('labor_hours', 0)), (float(data.get('labor_hours', 0)),
@@ -896,6 +951,10 @@ def update_labor(woid):
@login_required @login_required
def upload_photo(woid): def upload_photo(woid):
if 'photo' not in request.files: return jsonify({'error':'No file'}),400 if 'photo' not in request.files: return jsonify({'error':'No file'}),400
conn_chk = get_db()
if not _check_wo_access(conn_chk, woid):
conn_chk.close(); return jsonify({'error': 'No autorizado'}), 403
conn_chk.close()
f = request.files['photo'] f = request.files['photo']
if f and allowed_file(f.filename, ALLOWED_IMG): if f and allowed_file(f.filename, ALLOWED_IMG):
ext = f.filename.rsplit('.',1)[1].lower() ext = f.filename.rsplit('.',1)[1].lower()
@@ -916,6 +975,8 @@ def add_part_to_order(woid):
qty = float(data.get('quantity',1)) qty = float(data.get('quantity',1))
cost = float(data.get('unit_cost',0)) cost = float(data.get('unit_cost',0))
conn = get_db() conn = get_db()
if not _check_wo_access(conn, woid):
conn.close(); return jsonify({'error': 'No autorizado'}), 403
conn.execute("INSERT INTO work_order_parts (work_order_id,part_id,description,quantity,unit_cost) VALUES (?,?,?,?,?)", conn.execute("INSERT INTO work_order_parts (work_order_id,part_id,description,quantity,unit_cost) VALUES (?,?,?,?,?)",
(woid,part_id or None,data.get('description',''),qty,cost)) (woid,part_id or None,data.get('description',''),qty,cost))
if part_id: if part_id:
@@ -1164,6 +1225,8 @@ def purchase_detail(pid):
def purchase_update_status(pid): def purchase_update_status(pid):
data = request.json data = request.json
new_status = data.get('status') new_status = data.get('status')
if new_status not in PURCHASE_STATUSES:
return jsonify({'error': 'Estado inválido'}), 400
conn = get_db() conn = get_db()
extra_fields = "" extra_fields = ""
extra_vals = [] extra_vals = []
@@ -1200,6 +1263,8 @@ def upload_invoice(pid):
f = request.files.get('invoice') f = request.files.get('invoice')
if not f or not f.filename: if not f or not f.filename:
return jsonify({'ok': False, 'error': 'No file'}) return jsonify({'ok': False, 'error': 'No file'})
if not allowed_file(f.filename, ALLOWED_DOCS):
return jsonify({'ok': False, 'error': 'Tipo de archivo no permitido'})
ext = f.filename.rsplit('.',1)[-1].lower() ext = f.filename.rsplit('.',1)[-1].lower()
fname = f"invoice_{pid}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{ext}" fname = f"invoice_{pid}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{ext}"
f.save(os.path.join(UPLOAD_FOLDER, fname)) f.save(os.path.join(UPLOAD_FOLDER, fname))
@@ -1494,6 +1559,8 @@ def api_wo_equipment(woid):
return jsonify([dict(i) for i in items]) return jsonify([dict(i) for i in items])
# ── SIGNATURES ─────────────────────────────────────────────────────────────── # ── SIGNATURES ───────────────────────────────────────────────────────────────
_SIG_COLS = {'tech': 'signature_tech', 'client': 'signature_client'}
@app.route('/work-orders/<int:woid>/save-signature', methods=['POST']) @app.route('/work-orders/<int:woid>/save-signature', methods=['POST'])
@login_required @login_required
def save_signature(woid): def save_signature(woid):
@@ -1503,6 +1570,10 @@ def save_signature(woid):
dataurl = data.get('dataUrl', '') dataurl = data.get('dataUrl', '')
if not who or not dataurl: if not who or not dataurl:
return jsonify({'error': 'Missing data'}), 400 return jsonify({'error': 'Missing data'}), 400
# Allowlist: solo 'tech' o 'client' son válidos
col = _SIG_COLS.get(who)
if not col:
return jsonify({'error': 'Parámetro who inválido'}), 400
# Strip header: data:image/png;base64,.... # Strip header: data:image/png;base64,....
match = re.match(r'data:image/\w+;base64,(.*)', dataurl, re.DOTALL) match = re.match(r'data:image/\w+;base64,(.*)', dataurl, re.DOTALL)
if not match: if not match:
@@ -1511,9 +1582,10 @@ def save_signature(woid):
fname = f"sig_{woid}_{who}_{uuid.uuid4().hex[:6]}.png" fname = f"sig_{woid}_{who}_{uuid.uuid4().hex[:6]}.png"
with open(os.path.join(SIG_FOLDER, fname), 'wb') as f: with open(os.path.join(SIG_FOLDER, fname), 'wb') as f:
f.write(img_bytes) f.write(img_bytes)
col = 'signature_tech' if who == 'tech' else 'signature_client'
conn = get_db() conn = get_db()
# Delete old file if exists if not _check_wo_access(conn, woid):
conn.close(); return jsonify({'error': 'No autorizado'}), 403
# Delete old file if exists (col ya está validado contra allowlist)
old = conn.execute(f"SELECT {col} FROM work_orders WHERE id=?", (woid,)).fetchone() old = conn.execute(f"SELECT {col} FROM work_orders WHERE id=?", (woid,)).fetchone()
if old and old[col]: if old and old[col]:
op = os.path.join(SIG_FOLDER, old[col]) op = os.path.join(SIG_FOLDER, old[col])
@@ -1526,8 +1598,12 @@ def save_signature(woid):
@login_required @login_required
def clear_signature(woid): def clear_signature(woid):
who = request.json.get('who') who = request.json.get('who')
col = 'signature_tech' if who == 'tech' else 'signature_client' col = _SIG_COLS.get(who)
if not col:
return jsonify({'error': 'Parámetro who inválido'}), 400
conn = get_db() conn = get_db()
if not _check_wo_access(conn, woid):
conn.close(); return jsonify({'error': 'No autorizado'}), 403
old = conn.execute(f"SELECT {col} FROM work_orders WHERE id=?", (woid,)).fetchone() old = conn.execute(f"SELECT {col} FROM work_orders WHERE id=?", (woid,)).fetchone()
if old and old[col]: if old and old[col]:
fp = os.path.join(SIG_FOLDER, old[col]) fp = os.path.join(SIG_FOLDER, old[col])
+12 -2
View File
@@ -112,7 +112,13 @@ def verify_password(password, hashed):
return check_password_hash(hashed, password) return check_password_hash(hashed, password)
# ── Crear superadmin inicial ────────────────────────────────────────────────── # ── Crear superadmin inicial ──────────────────────────────────────────────────
def create_initial_superadmin(username='admin', password='admin123', email='admin@marine.local'): def create_initial_superadmin(username='admin', password=None, email='admin@marine.local'):
"""Crea el superadmin solo si no existe ninguno.
Password: variable de entorno ADMIN_PASSWORD, o 'admin123' como último recurso.
"""
import os
if password is None:
password = os.environ.get('ADMIN_PASSWORD', 'admin123')
conn = get_db() conn = get_db()
existing = conn.execute("SELECT id FROM users WHERE role='superadmin'").fetchone() existing = conn.execute("SELECT id FROM users WHERE role='superadmin'").fetchone()
if not existing: if not existing:
@@ -121,5 +127,9 @@ def create_initial_superadmin(username='admin', password='admin123', email='admi
VALUES (NULL, ?, ?, ?, 'Super Administrator', 'superadmin') VALUES (NULL, ?, ?, ?, 'Super Administrator', 'superadmin')
""", (username, email, hash_password(password))) """, (username, email, hash_password(password)))
conn.commit() conn.commit()
print(f"[AUTH] Superadmin creado: {username} / {password}") if password == 'admin123':
print(f"[AUTH] ⚠️ Superadmin creado con contraseña por defecto. "
f"Configura ADMIN_PASSWORD en tu .env y reinicia.")
else:
print(f"[AUTH] Superadmin creado: {username}")
conn.close() conn.close()