Files
alro65 7fe7304392 Security hardening: IDOR fixes, rate limiting, secret key, session cookies
- IDOR: ownership checks on WO approve/reject/done, charter update/complete/
  send-contracts/request-insurance, captain-contract PDF, insurance-rider PDF,
  delete accounting entry, delete fuel entry, update vessel
- auth.py: rate limiting (10 req/15min), explicit is_active check
- owner.py: role guard on /owner/dashboard
- __init__.py: random SECRET_KEY if unset, absolute SQLite path, parameterized
  SQL (no f-strings), session cookie HTTPONLY+SameSite, 8h session lifetime,
  db.session.get() replacing deprecated query.get()
- api.py: P&L double-count bug fixed (wo_cost was summed twice), Content-
  Disposition filename quoted, APP_BASE_URL env var replaces hardcoded
  localhost:5010, create_management_company validates password length and
  email uniqueness, dead code removed from sync_all_accounting
- create_admin.py: removed password from console output

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-05 03:01:49 -04:00

164 lines
7.6 KiB
Python

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager
from flask_mail import Mail
from datetime import datetime, timedelta
import os
import secrets
db = SQLAlchemy()
login_manager = LoginManager()
mail = Mail()
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
def create_app():
app = Flask(__name__)
# ── Secret key ───────────────────────────────────────────────────
_secret = os.environ.get('SECRET_KEY')
if not _secret:
_secret = secrets.token_hex(32)
print('⚠️ WARNING: SECRET_KEY no configurado — se generó uno aleatorio (sesiones no persisten entre reinicios). Configura SECRET_KEY en .env para producción.')
app.config['SECRET_KEY'] = _secret
# ── Database (absolute path) ─────────────────────────────────────
_db_path = os.path.join(BASE_DIR, '..', 'instance', 'fleet.db')
_db_path = os.path.abspath(_db_path)
os.makedirs(os.path.dirname(_db_path), exist_ok=True)
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{_db_path}'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# ── Session cookie hardening ──────────────────────────────────────
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=8)
# ── Mail ─────────────────────────────────────────────────────────
app.config['MAIL_SERVER'] = os.environ.get('MAIL_SERVER', 'smtp.gmail.com')
app.config['MAIL_PORT'] = int(os.environ.get('MAIL_PORT', 587))
app.config['MAIL_USE_TLS'] = True
app.config['MAIL_USERNAME'] = os.environ.get('MAIL_USERNAME', '')
app.config['MAIL_PASSWORD'] = os.environ.get('MAIL_PASSWORD', '')
app.config['MAIL_DEFAULT_SENDER'] = os.environ.get('MAIL_DEFAULT_SENDER', os.environ.get('MAIL_USERNAME', ''))
db.init_app(app)
login_manager.init_app(app)
mail.init_app(app)
login_manager.login_view = 'auth.login'
# Registrar rutas
from app.routes import auth, admin, owner, api
app.register_blueprint(auth.bp)
app.register_blueprint(admin.bp)
app.register_blueprint(owner.bp)
app.register_blueprint(api.bp)
from flask import redirect, url_for
@app.route('/')
def index():
return redirect(url_for('auth.login'))
# Crear tablas y migrar columnas nuevas
with app.app_context():
db.create_all()
_run_migrations(db)
return app
def _run_migrations(db):
"""Agrega columnas nuevas a tablas existentes (SQLite safe)."""
from sqlalchemy import text, inspect
inspector = inspect(db.engine)
existing_tables = inspector.get_table_names()
with db.engine.connect() as conn:
# work_orders: priority, notified_at
if 'work_orders' in existing_tables:
cols = [c['name'] for c in inspector.get_columns('work_orders')]
if 'priority' not in cols:
conn.execute(text("ALTER TABLE work_orders ADD COLUMN priority VARCHAR(20) DEFAULT 'normal'"))
conn.commit()
if 'notified_at' not in cols:
conn.execute(text("ALTER TABLE work_orders ADD COLUMN notified_at DATETIME"))
conn.commit()
if 'approved_at' not in cols:
conn.execute(text("ALTER TABLE work_orders ADD COLUMN approved_at DATETIME"))
conn.commit()
if 'approved_by_name' not in cols:
conn.execute(text("ALTER TABLE work_orders ADD COLUMN approved_by_name VARCHAR(100)"))
conn.commit()
if 'rejected_at' not in cols:
conn.execute(text("ALTER TABLE work_orders ADD COLUMN rejected_at DATETIME"))
conn.commit()
if 'rejection_reason' not in cols:
conn.execute(text("ALTER TABLE work_orders ADD COLUMN rejection_reason VARCHAR(300)"))
conn.commit()
if 'invoice_number' not in cols:
conn.execute(text("ALTER TABLE work_orders ADD COLUMN invoice_number VARCHAR(60)"))
conn.commit()
# charters: insurance + captain fields
if 'charters' in existing_tables:
cols = [c['name'] for c in inspector.get_columns('charters')]
for col, ddl in [
('insurance_rider_number', "ALTER TABLE charters ADD COLUMN insurance_rider_number VARCHAR(60)"),
('insurer_name', "ALTER TABLE charters ADD COLUMN insurer_name VARCHAR(100)"),
('coverage_amount', "ALTER TABLE charters ADD COLUMN coverage_amount FLOAT"),
('damage_waiver', "ALTER TABLE charters ADD COLUMN damage_waiver FLOAT DEFAULT 0"),
('captain_id', "ALTER TABLE charters ADD COLUMN captain_id INTEGER REFERENCES captains(id)"),
]:
if col not in cols:
conn.execute(text(ddl))
conn.commit()
# captains: license_type
if 'captains' in existing_tables:
cols = [c['name'] for c in inspector.get_columns('captains')]
if 'license_type' not in cols:
conn.execute(text("ALTER TABLE captains ADD COLUMN license_type VARCHAR(20) DEFAULT 'private'"))
conn.commit()
# accounting_entries, fuel_entries, documents → ya los crea db.create_all()
# vessels: management_company_id, max_passengers
if 'vessels' in existing_tables:
cols = [c['name'] for c in inspector.get_columns('vessels')]
if 'management_company_id' not in cols:
conn.execute(text("ALTER TABLE vessels ADD COLUMN management_company_id INTEGER REFERENCES companies(id)"))
conn.commit()
if 'max_passengers' not in cols:
conn.execute(text("ALTER TABLE vessels ADD COLUMN max_passengers INTEGER DEFAULT 12"))
conn.commit()
# users: is_super_admin
if 'users' in existing_tables:
cols = [c['name'] for c in inspector.get_columns('users')]
if 'is_super_admin' not in cols:
conn.execute(text("ALTER TABLE users ADD COLUMN is_super_admin BOOLEAN DEFAULT 0"))
conn.commit()
# Backfill management_company_id for existing vessels
with db.engine.connect() as conn:
from sqlalchemy import text
mgmt = conn.execute(text("SELECT id FROM companies WHERE type='management' LIMIT 1")).fetchone()
if mgmt:
conn.execute(text("UPDATE vessels SET management_company_id = :mid WHERE management_company_id IS NULL"),
{"mid": mgmt[0]})
conn.commit()
# Mark first admin as super_admin if none exists
with db.engine.connect() as conn:
from sqlalchemy import text
sup = conn.execute(text("SELECT id FROM users WHERE is_super_admin=1 LIMIT 1")).fetchone()
if not sup:
first_admin = conn.execute(text("SELECT id FROM users WHERE role='admin' LIMIT 1")).fetchone()
if first_admin:
conn.execute(text("UPDATE users SET is_super_admin=1 WHERE id=:uid"),
{"uid": first_admin[0]})
conn.commit()
@login_manager.user_loader
def load_user(user_id):
from app.models import User
return db.session.get(User, int(user_id))