ab4c9c81b0
- SECRET_KEY desde variable de entorno (warn si no configurado) - login: rate limiting (10 intentos / 15 min) + validación next param (open redirect fix) - update_status: allowlist de estados válidos antes de ejecutar SQL - purchase_update_status: allowlist contra PURCHASE_STATUSES - save/clear_signature: allowlist _SIG_COLS para col derivado del request - upload_invoice: validación de extensión contra ALLOWED_DOCS - update_fields, update_labor, upload_photo, add_part_to_order: ownership check (empresa) - update_status, save/clear_signature: ownership check en WO mutations - auth.py: contraseña admin inicial desde ADMIN_PASSWORD env var Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
136 lines
5.2 KiB
Python
136 lines
5.2 KiB
Python
"""
|
|
auth.py — Login y control de acceso sin flask-login
|
|
Usa sesiones de Flask + werkzeug para hash de contraseñas
|
|
"""
|
|
from functools import wraps
|
|
from flask import session, redirect, url_for, flash, request
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
import sqlite3, os
|
|
|
|
DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'marine_maintenance.db')
|
|
|
|
def get_db():
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA foreign_keys = ON")
|
|
return conn
|
|
|
|
# ── Sesión ────────────────────────────────────────────────────────────────────
|
|
def login_user(user):
|
|
session['user_id'] = user['id']
|
|
session['username'] = user['username']
|
|
session['full_name'] = user['full_name'] or user['username']
|
|
session['role'] = user['role']
|
|
session['company_id'] = user['company_id']
|
|
session['company_name'] = user['company_name'] if 'company_name' in user.keys() else None
|
|
# update last_login
|
|
conn = get_db()
|
|
conn.execute("UPDATE users SET last_login=CURRENT_TIMESTAMP WHERE id=?", (user['id'],))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def logout_user():
|
|
session.clear()
|
|
|
|
def current_user():
|
|
if 'user_id' not in session:
|
|
return None
|
|
return {
|
|
'id': session.get('user_id'),
|
|
'username': session.get('username'),
|
|
'full_name': session.get('full_name'),
|
|
'role': session.get('role'),
|
|
'company_id': session.get('company_id'),
|
|
'company_name': session.get('company_name'),
|
|
}
|
|
|
|
def is_logged_in():
|
|
return 'user_id' in session
|
|
|
|
def is_superadmin():
|
|
return session.get('role') == 'superadmin'
|
|
|
|
def is_admin():
|
|
return session.get('role') in ('superadmin', 'admin')
|
|
|
|
# ── Decoradores ───────────────────────────────────────────────────────────────
|
|
def login_required(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
if not is_logged_in():
|
|
return redirect(url_for('auth_login', next=request.url))
|
|
return f(*args, **kwargs)
|
|
return decorated
|
|
|
|
def admin_required(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
if not is_logged_in():
|
|
return redirect(url_for('auth_login'))
|
|
if not is_admin():
|
|
return redirect(url_for('dashboard'))
|
|
return f(*args, **kwargs)
|
|
return decorated
|
|
|
|
def superadmin_required(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
if not is_logged_in():
|
|
return redirect(url_for('auth_login'))
|
|
if not is_superadmin():
|
|
return redirect(url_for('dashboard'))
|
|
return f(*args, **kwargs)
|
|
return decorated
|
|
|
|
# ── Filtro de compañía ────────────────────────────────────────────────────────
|
|
def company_filter():
|
|
"""
|
|
Retorna (sql_where, params) para filtrar por compañía del usuario.
|
|
Si es superadmin, no filtra.
|
|
"""
|
|
if is_superadmin():
|
|
return "", []
|
|
cid = session.get('company_id')
|
|
if cid:
|
|
return "WHERE company_id = ?", [cid]
|
|
return "WHERE 1=0", [] # sin compañía asignada no ve nada
|
|
|
|
def vessel_filter():
|
|
"""WHERE clause para embarcaciones según compañía del usuario."""
|
|
if is_superadmin():
|
|
return "", []
|
|
cid = session.get('company_id')
|
|
if cid:
|
|
return "WHERE v.company_id = ?", [cid]
|
|
return "WHERE 1=0", []
|
|
|
|
# ── Hash de contraseñas ───────────────────────────────────────────────────────
|
|
def hash_password(password):
|
|
return generate_password_hash(password)
|
|
|
|
def verify_password(password, hashed):
|
|
return check_password_hash(hashed, password)
|
|
|
|
# ── Crear superadmin inicial ──────────────────────────────────────────────────
|
|
def create_initial_superadmin(username='admin', password=None, email='admin@marine.local'):
|
|
"""Crea el superadmin solo si no existe ninguno.
|
|
Password: variable de entorno ADMIN_PASSWORD, o 'admin123' como último recurso.
|
|
"""
|
|
import os
|
|
if password is None:
|
|
password = os.environ.get('ADMIN_PASSWORD', 'admin123')
|
|
conn = get_db()
|
|
existing = conn.execute("SELECT id FROM users WHERE role='superadmin'").fetchone()
|
|
if not existing:
|
|
conn.execute("""
|
|
INSERT INTO users (company_id, username, email, password_hash, full_name, role)
|
|
VALUES (NULL, ?, ?, ?, 'Super Administrator', 'superadmin')
|
|
""", (username, email, hash_password(password)))
|
|
conn.commit()
|
|
if password == 'admin123':
|
|
print(f"[AUTH] ⚠️ Superadmin creado con contraseña por defecto. "
|
|
f"Configura ADMIN_PASSWORD en tu .env y reinicia.")
|
|
else:
|
|
print(f"[AUTH] Superadmin creado: {username}")
|
|
conn.close()
|