Files
alro65 ab4c9c81b0 Security hardening: env SECRET_KEY, rate limiting, input validation, ownership checks
- SECRET_KEY desde variable de entorno (warn si no configurado)
- login: rate limiting (10 intentos / 15 min) + validación next param (open redirect fix)
- update_status: allowlist de estados válidos antes de ejecutar SQL
- purchase_update_status: allowlist contra PURCHASE_STATUSES
- save/clear_signature: allowlist _SIG_COLS para col derivado del request
- upload_invoice: validación de extensión contra ALLOWED_DOCS
- update_fields, update_labor, upload_photo, add_part_to_order: ownership check (empresa)
- update_status, save/clear_signature: ownership check en WO mutations
- auth.py: contraseña admin inicial desde ADMIN_PASSWORD env var

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-05 02:14:04 -04:00

136 lines
5.2 KiB
Python

"""
auth.py — Login y control de acceso sin flask-login
Usa sesiones de Flask + werkzeug para hash de contraseñas
"""
from functools import wraps
from flask import session, redirect, url_for, flash, request
from werkzeug.security import generate_password_hash, check_password_hash
import sqlite3, os
DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'marine_maintenance.db')
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
return conn
# ── Sesión ────────────────────────────────────────────────────────────────────
def login_user(user):
session['user_id'] = user['id']
session['username'] = user['username']
session['full_name'] = user['full_name'] or user['username']
session['role'] = user['role']
session['company_id'] = user['company_id']
session['company_name'] = user['company_name'] if 'company_name' in user.keys() else None
# update last_login
conn = get_db()
conn.execute("UPDATE users SET last_login=CURRENT_TIMESTAMP WHERE id=?", (user['id'],))
conn.commit()
conn.close()
def logout_user():
session.clear()
def current_user():
if 'user_id' not in session:
return None
return {
'id': session.get('user_id'),
'username': session.get('username'),
'full_name': session.get('full_name'),
'role': session.get('role'),
'company_id': session.get('company_id'),
'company_name': session.get('company_name'),
}
def is_logged_in():
return 'user_id' in session
def is_superadmin():
return session.get('role') == 'superadmin'
def is_admin():
return session.get('role') in ('superadmin', 'admin')
# ── Decoradores ───────────────────────────────────────────────────────────────
def login_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if not is_logged_in():
return redirect(url_for('auth_login', next=request.url))
return f(*args, **kwargs)
return decorated
def admin_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if not is_logged_in():
return redirect(url_for('auth_login'))
if not is_admin():
return redirect(url_for('dashboard'))
return f(*args, **kwargs)
return decorated
def superadmin_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if not is_logged_in():
return redirect(url_for('auth_login'))
if not is_superadmin():
return redirect(url_for('dashboard'))
return f(*args, **kwargs)
return decorated
# ── Filtro de compañía ────────────────────────────────────────────────────────
def company_filter():
"""
Retorna (sql_where, params) para filtrar por compañía del usuario.
Si es superadmin, no filtra.
"""
if is_superadmin():
return "", []
cid = session.get('company_id')
if cid:
return "WHERE company_id = ?", [cid]
return "WHERE 1=0", [] # sin compañía asignada no ve nada
def vessel_filter():
"""WHERE clause para embarcaciones según compañía del usuario."""
if is_superadmin():
return "", []
cid = session.get('company_id')
if cid:
return "WHERE v.company_id = ?", [cid]
return "WHERE 1=0", []
# ── Hash de contraseñas ───────────────────────────────────────────────────────
def hash_password(password):
return generate_password_hash(password)
def verify_password(password, hashed):
return check_password_hash(hashed, password)
# ── Crear superadmin inicial ──────────────────────────────────────────────────
def create_initial_superadmin(username='admin', password=None, email='admin@marine.local'):
"""Crea el superadmin solo si no existe ninguno.
Password: variable de entorno ADMIN_PASSWORD, o 'admin123' como último recurso.
"""
import os
if password is None:
password = os.environ.get('ADMIN_PASSWORD', 'admin123')
conn = get_db()
existing = conn.execute("SELECT id FROM users WHERE role='superadmin'").fetchone()
if not existing:
conn.execute("""
INSERT INTO users (company_id, username, email, password_hash, full_name, role)
VALUES (NULL, ?, ?, ?, 'Super Administrator', 'superadmin')
""", (username, email, hash_password(password)))
conn.commit()
if password == 'admin123':
print(f"[AUTH] ⚠️ Superadmin creado con contraseña por defecto. "
f"Configura ADMIN_PASSWORD en tu .env y reinicia.")
else:
print(f"[AUTH] Superadmin creado: {username}")
conn.close()