Files
alro65 764b72a318 Security hardening: env SECRET_KEY, rate limiting, upload validation, Stripe fixes
- SECRET_KEY desde variable de entorno (warn si no configurado)
- Rutas absolutas para DB, logos y PDFs
- MAX_CONTENT_LENGTH = 16 MB
- Validación de extensión/tipo en subida de logos y firmas
- _doc_list_page: clientes y productos filtrados por empresa del usuario
- login: rate limiting (10 intentos / 15 min por IP)
- load_user: db.session.get() (SQLAlchemy 2.x compatible)
- pay_success: verifica sesión Stripe antes de marcar factura como pagada
- stripe_checkout: api_key por llamada (thread-safe, elimina global mutable)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-05 02:13:59 -04:00

1106 lines
50 KiB
Python

from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, send_file, abort
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.utils import secure_filename
from datetime import datetime
import os, json, smtplib, re, secrets, time
try:
import requests as http_requests
HAS_REQUESTS = True
except ImportError:
HAS_REQUESTS = False
try:
import stripe
except ImportError:
stripe = None
try:
import qrcode, io, base64
HAS_QR = True
except ImportError:
HAS_QR = False
APP_BASE_URL_FALLBACK = 'http://100.96.43.86:5000' # Tailscale fallback
def get_public_base_url():
"""Detecta automáticamente la URL pública de ngrok si está corriendo,
si no usa el fallback de Tailscale."""
try:
r = http_requests.get('http://localhost:4040/api/tunnels', timeout=2)
tunnels = r.json().get('tunnels', [])
for t in tunnels:
if t.get('proto') == 'https':
return t['public_url'].rstrip('/')
except Exception:
pass
return APP_BASE_URL_FALLBACK
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.text import MIMEText
from email import encoders
app = Flask(__name__)
# ── Security: SECRET_KEY desde variable de entorno ───────────────────────────
_secret_key = os.environ.get('SECRET_KEY')
if not _secret_key:
_secret_key = 'marineinvoice-secret-key-2024'
print('⚠️ WARNING: SECRET_KEY no configurado en variables de entorno. '
'Crea un archivo .env con SECRET_KEY=<clave aleatoria> antes de producción.')
app.config['SECRET_KEY'] = _secret_key
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{os.path.join(BASE_DIR, "instance", "marineinvoice.db")}'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['UPLOAD_FOLDER'] = os.path.join(BASE_DIR, 'static', 'logos')
app.config['PDF_FOLDER'] = os.path.join(BASE_DIR, 'static', 'pdfs')
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16 MB máximo por request
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
os.makedirs(app.config['PDF_FOLDER'], exist_ok=True)
os.makedirs(os.path.join(BASE_DIR, 'instance'), exist_ok=True)
# Extensiones permitidas en subidas de archivos
ALLOWED_IMG_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
def _allowed_image(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_IMG_EXTENSIONS
# ── Rate-limiting simple para login (sin dependencias externas) ──────────────
_login_attempts: dict = {} # ip -> [timestamps]
_LOGIN_MAX = 10 # intentos máximos
_LOGIN_WINDOW = 900 # ventana de 15 minutos
def _is_rate_limited(ip: str) -> bool:
now = time.time()
attempts = [t for t in _login_attempts.get(ip, []) if now - t < _LOGIN_WINDOW]
_login_attempts[ip] = attempts
return len(attempts) >= _LOGIN_MAX
def _record_failed_login(ip: str):
_login_attempts.setdefault(ip, []).append(time.time())
db = SQLAlchemy(app)
login_manager = LoginManager(app)
login_manager.login_view = 'login'
login_manager.login_message = 'Por favor inicia sesión para continuar.'
# ============================================================
# MODELS
# ============================================================
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(256), nullable=False)
role = db.Column(db.String(20), default='user') # superadmin, admin, user
company_id = db.Column(db.Integer, db.ForeignKey('company.id'), nullable=True)
full_name = db.Column(db.String(120))
signature = db.Column(db.Text) # base64 PNG firma del usuario
smtp_user = db.Column(db.String(120)) # email corporativo
smtp_password = db.Column(db.String(200)) # contraseña del email corporativo
email_title = db.Column(db.String(200)) # Título/cargo que aparece en el From del email
active = db.Column(db.Boolean, default=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def set_password(self, pw): self.password_hash = generate_password_hash(pw)
def check_password(self, pw): return check_password_hash(self.password_hash, pw)
def is_superadmin(self): return self.role == 'superadmin'
def can_access_company(self, company_id):
if self.role == 'superadmin': return True
return self.company_id == int(company_id)
class Company(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(120), nullable=False)
ein = db.Column(db.String(20))
license_num = db.Column(db.String(50))
phone = db.Column(db.String(30))
address = db.Column(db.String(200))
city = db.Column(db.String(80))
state = db.Column(db.String(30))
email = db.Column(db.String(120))
website = db.Column(db.String(120))
manager = db.Column(db.String(120))
authorized = db.Column(db.String(120))
tax_rate = db.Column(db.Float, default=7.0)
notes = db.Column(db.Text) # quote notes (backward compat)
invoice_notes = db.Column(db.Text) # notes specific to invoices
quote_notes = db.Column(db.Text) # notes specific to quotations
logo_path = db.Column(db.String(200))
signature_path = db.Column(db.String(200)) # firma guardada de la compañía
# Numbering format: prefix letters only, e.g. "IPY" for invoices, "QPY" for quotes
invoice_prefix = db.Column(db.String(10), default='INV')
quote_prefix = db.Column(db.String(10), default='QUO')
# Internal counters per month — stored as JSON: {"2024-03": 5, "2024-04": 2}
invoice_counters = db.Column(db.Text, default='{}')
quote_counters = db.Column(db.Text, default='{}')
# Email config for sending PDFs
smtp_host = db.Column(db.String(120))
smtp_port = db.Column(db.Integer, default=587)
smtp_user = db.Column(db.String(120))
smtp_password = db.Column(db.String(200))
smtp_from_name = db.Column(db.String(120))
stripe_secret_key = db.Column(db.String(200))
stripe_publishable_key = db.Column(db.String(200))
active = db.Column(db.Boolean, default=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
clients = db.relationship('Client', backref='company', lazy=True)
products = db.relationship('Product', backref='company', lazy=True)
documents = db.relationship('Document', backref='company', lazy=True)
users = db.relationship('User', backref='company', lazy=True)
def get_next_number(self, doc_type):
"""Get next auto number for invoice (I) or quote (Q). Does NOT increment — call increment_counter after saving."""
now = datetime.utcnow()
month_key = now.strftime('%Y-%m')
month_str = now.strftime('%m%Y')
if doc_type == 'invoice':
counters = json.loads(self.invoice_counters or '{}')
prefix = self.invoice_prefix or 'INV'
else:
counters = json.loads(self.quote_counters or '{}')
prefix = self.quote_prefix or 'QUO'
current_count = counters.get(month_key, 0) + 1
return f"{prefix}-{str(current_count).zfill(3)}-{month_str}"
def increment_counter(self, doc_type):
"""Increment the internal counter for this month."""
now = datetime.utcnow()
month_key = now.strftime('%Y-%m')
if doc_type == 'invoice':
counters = json.loads(self.invoice_counters or '{}')
counters[month_key] = counters.get(month_key, 0) + 1
self.invoice_counters = json.dumps(counters)
else:
counters = json.loads(self.quote_counters or '{}')
counters[month_key] = counters.get(month_key, 0) + 1
self.quote_counters = json.dumps(counters)
class Client(db.Model):
id = db.Column(db.Integer, primary_key=True)
company_id = db.Column(db.Integer, db.ForeignKey('company.id'), nullable=False)
name = db.Column(db.String(120), nullable=False)
contact = db.Column(db.String(120))
email = db.Column(db.String(120))
phone = db.Column(db.String(30))
address = db.Column(db.String(200))
city = db.Column(db.String(80))
state = db.Column(db.String(30))
yacht_name = db.Column(db.String(80))
yacht_info = db.Column(db.String(120))
notes = db.Column(db.Text)
active = db.Column(db.Boolean, default=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
class Product(db.Model):
id = db.Column(db.Integer, primary_key=True)
company_id = db.Column(db.Integer, db.ForeignKey('company.id'), nullable=False)
name = db.Column(db.String(120), nullable=False)
description = db.Column(db.Text)
price = db.Column(db.Float, nullable=False)
unit = db.Column(db.String(20), default='hr')
item_type = db.Column(db.String(20), default='service')
active = db.Column(db.Boolean, default=True)
class Document(db.Model):
"""Unified model for both Invoices and Quotes"""
id = db.Column(db.Integer, primary_key=True)
doc_type = db.Column(db.String(10), nullable=False) # 'invoice' or 'quote'
company_id = db.Column(db.Integer, db.ForeignKey('company.id'), nullable=False)
client_id = db.Column(db.Integer, db.ForeignKey('client.id'), nullable=False)
created_by = db.Column(db.Integer, db.ForeignKey('user.id'))
number = db.Column(db.String(40), nullable=False) # display number (user can adjust)
internal_seq = db.Column(db.Integer, default=0) # internal counter, never changes
date = db.Column(db.Date, nullable=False)
due_date = db.Column(db.Date)
status = db.Column(db.String(20), default='draft')
# invoice statuses: draft, sent, paid, cancelled
# quote statuses: draft, sent, accepted, rejected
language = db.Column(db.String(5), default='en')
description = db.Column(db.Text)
line_items = db.Column(db.Text) # JSON
subtotal = db.Column(db.Float, default=0)
tax_rate = db.Column(db.Float, default=7)
tax_amount = db.Column(db.Float, default=0)
total = db.Column(db.Float, default=0)
notes = db.Column(db.Text)
pdf_path = db.Column(db.String(300))
prepared_by = db.Column(db.String(120))
signed_by = db.Column(db.String(120))
signature = db.Column(db.Text)
payment_token = db.Column(db.String(64), unique=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
client = db.relationship('Client', backref='documents', lazy=True)
creator = db.relationship('User', backref='documents', lazy=True, foreign_keys=[created_by])
@login_manager.user_loader
def load_user(user_id): return db.session.get(User, int(user_id))
# ============================================================
# AUTH
# ============================================================
@app.route('/login', methods=['GET','POST'])
def login():
if current_user.is_authenticated: return redirect(url_for('dashboard'))
if request.method == 'POST':
ip = request.remote_addr or '0.0.0.0'
if _is_rate_limited(ip):
flash('Demasiados intentos fallidos. Espera 15 minutos.', 'error')
return render_template('login.html')
u = request.form.get('username','').strip()
p = request.form.get('password','')
user = User.query.filter_by(username=u, active=True).first()
if user and user.check_password(p):
login_user(user, remember=True)
return redirect(url_for('dashboard'))
_record_failed_login(ip)
flash('Usuario o contraseña incorrectos', 'error')
return render_template('login.html')
@app.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('login'))
# ============================================================
# DASHBOARD
# ============================================================
@app.route('/')
@login_required
def dashboard():
if current_user.is_superadmin():
companies = Company.query.filter_by(active=True).all()
total_invoices = Document.query.filter_by(doc_type='invoice').count()
total_quotes = Document.query.filter_by(doc_type='quote').count()
total_clients = Client.query.count()
total_billed = db.session.query(db.func.sum(Document.total)).filter_by(doc_type='invoice').scalar() or 0
recent_invoices = Document.query.filter_by(doc_type='invoice').order_by(Document.created_at.desc()).limit(6).all()
recent_quotes = Document.query.filter_by(doc_type='quote').order_by(Document.created_at.desc()).limit(6).all()
else:
companies = [current_user.company] if current_user.company else []
cid = current_user.company_id
total_invoices = Document.query.filter_by(company_id=cid, doc_type='invoice').count()
total_quotes = Document.query.filter_by(company_id=cid, doc_type='quote').count()
total_clients = Client.query.filter_by(company_id=cid).count()
total_billed = db.session.query(db.func.sum(Document.total)).filter_by(company_id=cid, doc_type='invoice').scalar() or 0
recent_invoices = Document.query.filter_by(company_id=cid, doc_type='invoice').order_by(Document.created_at.desc()).limit(6).all()
recent_quotes = Document.query.filter_by(company_id=cid, doc_type='quote').order_by(Document.created_at.desc()).limit(6).all()
return render_template('dashboard.html', companies=companies,
total_invoices=total_invoices, total_quotes=total_quotes,
total_clients=total_clients, total_billed=total_billed,
recent_invoices=recent_invoices, recent_quotes=recent_quotes)
# ============================================================
# COMPANIES
# ============================================================
@app.route('/companies')
@login_required
def companies():
if not current_user.is_superadmin(): return redirect(url_for('dashboard'))
return render_template('companies.html', companies=Company.query.filter_by(active=True).all())
@app.route('/companies/new', methods=['GET','POST'])
@login_required
def new_company():
if not current_user.is_superadmin(): return redirect(url_for('dashboard'))
if request.method == 'POST':
logo_path = None
if 'logo' in request.files:
f = request.files['logo']
if f and f.filename:
if not _allowed_image(f.filename):
flash('Formato de imagen no permitido (usa PNG, JPG, GIF, WEBP)', 'error')
return render_template('company_form.html', company=None)
fn = secure_filename(f.filename)
f.save(os.path.join(app.config['UPLOAD_FOLDER'], fn))
logo_path = f'logos/{fn}'
c = Company(
name=request.form['name'], ein=request.form.get('ein',''),
license_num=request.form.get('license',''), phone=request.form.get('phone',''),
address=request.form.get('address',''), city=request.form.get('city',''),
state=request.form.get('state',''), email=request.form.get('email',''),
website=request.form.get('website',''), manager=request.form.get('manager',''),
authorized=request.form.get('authorized',''),
tax_rate=float(request.form.get('tax_rate',7)),
invoice_prefix=request.form.get('invoice_prefix','INV').upper().strip(),
quote_prefix=request.form.get('quote_prefix','QUO').upper().strip(),
smtp_host=request.form.get('smtp_host',''),
smtp_port=int(request.form.get('smtp_port',587) or 587),
smtp_user=request.form.get('smtp_user',''),
smtp_password=request.form.get('smtp_password',''),
smtp_from_name=request.form.get('smtp_from_name',''),
stripe_secret_key=request.form.get('stripe_secret_key',''),
stripe_publishable_key=request.form.get('stripe_publishable_key',''),
notes=request.form.get('notes',''),
invoice_notes=request.form.get('invoice_notes',''),
quote_notes=request.form.get('quote_notes',''),
logo_path=logo_path
)
db.session.add(c)
db.session.commit()
flash('Compañía creada exitosamente', 'success')
return redirect(url_for('companies'))
return render_template('company_form.html', company=None)
@app.route('/companies/<int:id>/edit', methods=['GET','POST'])
@login_required
def edit_company(id):
if not current_user.is_superadmin(): return redirect(url_for('dashboard'))
c = Company.query.get_or_404(id)
if request.method == 'POST':
if 'logo' in request.files:
f = request.files['logo']
if f and f.filename:
if not _allowed_image(f.filename):
flash('Formato de imagen no permitido (usa PNG, JPG, GIF, WEBP)', 'error')
return render_template('company_form.html', company=c)
fn = secure_filename(f.filename)
f.save(os.path.join(app.config['UPLOAD_FOLDER'], fn))
c.logo_path = f'logos/{fn}'
if 'signature' in request.files:
f = request.files['signature']
if f and f.filename:
if not _allowed_image(f.filename):
flash('Formato de imagen no permitido (usa PNG, JPG, GIF, WEBP)', 'error')
return render_template('company_form.html', company=c)
fn = 'sig_' + secure_filename(f.filename)
f.save(os.path.join(app.config['UPLOAD_FOLDER'], fn))
c.signature_path = f'logos/{fn}'
c.name=request.form['name']; c.ein=request.form.get('ein','')
c.license_num=request.form.get('license',''); c.phone=request.form.get('phone','')
c.address=request.form.get('address',''); c.city=request.form.get('city','')
c.state=request.form.get('state',''); c.email=request.form.get('email','')
c.website=request.form.get('website',''); c.manager=request.form.get('manager','')
c.authorized=request.form.get('authorized','')
c.tax_rate=float(request.form.get('tax_rate',7))
c.invoice_prefix=request.form.get('invoice_prefix','INV').upper().strip()
c.quote_prefix=request.form.get('quote_prefix','QUO').upper().strip()
c.smtp_host=request.form.get('smtp_host','')
c.smtp_port=int(request.form.get('smtp_port',587) or 587)
c.smtp_user=request.form.get('smtp_user','')
if request.form.get('smtp_password'): c.smtp_password=request.form.get('smtp_password')
c.smtp_from_name=request.form.get('smtp_from_name','')
if request.form.get('stripe_secret_key'): c.stripe_secret_key=request.form.get('stripe_secret_key')
if request.form.get('stripe_publishable_key'): c.stripe_publishable_key=request.form.get('stripe_publishable_key')
c.notes=request.form.get('notes','')
c.invoice_notes=request.form.get('invoice_notes','')
c.quote_notes=request.form.get('quote_notes','')
db.session.commit()
flash('Compañía actualizada', 'success')
return redirect(url_for('companies'))
return render_template('company_form.html', company=c)
@app.route('/companies/<int:id>/delete', methods=['POST'])
@login_required
def delete_company(id):
if not current_user.is_superadmin(): return jsonify({'error':'No autorizado'}),403
c = Company.query.get_or_404(id)
c.active = False; db.session.commit()
return jsonify({'success':True})
# ============================================================
# USERS
# ============================================================
@app.route('/users')
@login_required
def users():
if not current_user.is_superadmin(): return redirect(url_for('dashboard'))
return render_template('users.html',
users=User.query.filter_by(active=True).all(),
companies=Company.query.filter_by(active=True).all())
@app.route('/users/new', methods=['POST'])
@login_required
def new_user():
if not current_user.is_superadmin(): return jsonify({'error':'No autorizado'}),403
d = request.get_json()
if User.query.filter_by(username=d['username']).first():
return jsonify({'error':'Usuario ya existe'}),400
u = User(username=d['username'], email=d.get('email',''),
full_name=d.get('full_name',''), role=d.get('role','user'),
company_id=d.get('company_id') or None,
smtp_user=d.get('smtp_user',''),
smtp_password=d.get('smtp_password',''),
email_title=d.get('email_title',''))
u.set_password(d['password'])
db.session.add(u); db.session.commit()
return jsonify({'success':True,'id':u.id})
@app.route('/profile')
@login_required
def profile():
return render_template('profile.html')
@app.route('/profile/save', methods=['POST'])
@login_required
def save_profile():
d = request.get_json()
current_user.full_name = d.get('full_name', current_user.full_name)
current_user.smtp_user = d.get('smtp_user', current_user.smtp_user or '')
current_user.email_title = d.get('email_title', current_user.email_title or '')
if d.get('smtp_password'):
current_user.smtp_password = d.get('smtp_password')
if d.get('password'):
current_user.set_password(d['password'])
db.session.commit()
return jsonify({'success': True})
@app.route('/users/<int:id>/edit', methods=['POST'])
@login_required
def edit_user(id):
# Superadmin puede editar cualquiera; cualquier usuario puede editarse a sí mismo
if not current_user.is_superadmin() and current_user.id != id:
return jsonify({'error':'No autorizado'}),403
u = User.query.get_or_404(id)
d = request.get_json()
u.full_name = d.get('full_name', u.full_name)
u.email = d.get('email', u.email)
u.smtp_user = d.get('smtp_user', u.smtp_user or '')
u.email_title = d.get('email_title', u.email_title or '')
if d.get('smtp_password'):
u.smtp_password = d.get('smtp_password')
# Only superadmin can change role and company
if current_user.is_superadmin():
u.role = d.get('role', u.role)
u.company_id = d.get('company_id') or None
if d.get('password'):
u.set_password(d['password'])
db.session.commit()
return jsonify({'success': True})
@app.route('/users/<int:id>/delete', methods=['POST'])
@login_required
def delete_user(id):
if not current_user.is_superadmin(): return jsonify({'error':'No autorizado'}),403
u = User.query.get_or_404(id); u.active=False; db.session.commit()
return jsonify({'success':True})
@app.route('/users/<int:id>/reset-password', methods=['POST'])
@login_required
def reset_password(id):
if not current_user.is_superadmin(): return jsonify({'error':'No autorizado'}),403
d = request.get_json()
u = User.query.get_or_404(id); u.set_password(d['password']); db.session.commit()
return jsonify({'success':True})
# ============================================================
# CLIENTS
# ============================================================
@app.route('/clients')
@login_required
def clients():
if current_user.is_superadmin():
all_clients = Client.query.filter_by(active=True).all()
else:
all_clients = Client.query.filter_by(company_id=current_user.company_id, active=True).all()
companies = Company.query.filter_by(active=True).all()
return render_template('clients.html', clients=all_clients, companies=companies)
@app.route('/clients/new', methods=['POST'])
@login_required
def new_client():
d = request.get_json()
cid = d.get('company_id')
if not current_user.can_access_company(cid): return jsonify({'error':'No autorizado'}),403
c = Client(company_id=cid, name=d['name'], contact=d.get('contact',''),
email=d.get('email',''), phone=d.get('phone',''), address=d.get('address',''),
city=d.get('city',''), state=d.get('state',''),
yacht_name=d.get('yacht_name',''), yacht_info=d.get('yacht_info',''),
notes=d.get('notes',''))
db.session.add(c); db.session.commit()
return jsonify({'success':True,'id':c.id})
@app.route('/clients/<int:id>', methods=['PUT'])
@login_required
def update_client(id):
c = Client.query.get_or_404(id)
if not current_user.can_access_company(c.company_id): return jsonify({'error':'No autorizado'}),403
d = request.get_json()
for f in ['name','contact','email','phone','address','city','state','yacht_name','yacht_info','notes']:
if f in d: setattr(c, f, d[f])
db.session.commit(); return jsonify({'success':True})
@app.route('/clients/<int:id>', methods=['DELETE'])
@login_required
def delete_client(id):
c = Client.query.get_or_404(id)
if not current_user.can_access_company(c.company_id): return jsonify({'error':'No autorizado'}),403
c.active=False; db.session.commit(); return jsonify({'success':True})
# ============================================================
# PRODUCTS
# ============================================================
@app.route('/products')
@login_required
def products():
if current_user.is_superadmin():
all_products = Product.query.filter_by(active=True).all()
else:
all_products = Product.query.filter_by(company_id=current_user.company_id, active=True).all()
companies = Company.query.filter_by(active=True).all()
return render_template('products.html', products=all_products, companies=companies)
@app.route('/products/new', methods=['POST'])
@login_required
def new_product():
d = request.get_json()
cid = d.get('company_id')
if not current_user.can_access_company(cid): return jsonify({'error':'No autorizado'}),403
p = Product(company_id=cid, name=d['name'], description=d.get('description',''),
price=float(d['price']), unit=d.get('unit','hr'), item_type=d.get('item_type','service'))
db.session.add(p); db.session.commit(); return jsonify({'success':True,'id':p.id})
@app.route('/products/<int:id>', methods=['PUT'])
@login_required
def update_product(id):
p = Product.query.get_or_404(id)
if not current_user.can_access_company(p.company_id): return jsonify({'error':'No autorizado'}),403
d = request.get_json()
for f in ['name','description','unit','item_type']:
if f in d: setattr(p, f, d[f])
if 'price' in d: p.price=float(d['price'])
db.session.commit(); return jsonify({'success':True})
@app.route('/products/<int:id>', methods=['DELETE'])
@login_required
def delete_product(id):
p = Product.query.get_or_404(id)
if not current_user.can_access_company(p.company_id): return jsonify({'error':'No autorizado'}),403
p.active=False; db.session.commit(); return jsonify({'success':True})
# ============================================================
# DOCUMENTS (INVOICES + QUOTES)
# ============================================================
@app.route('/invoices')
@login_required
def invoices():
return _doc_list_page('invoice')
@app.route('/quotes')
@login_required
def quotes():
return _doc_list_page('quote')
def _doc_list_page(doc_type):
if current_user.is_superadmin():
docs = Document.query.filter_by(doc_type=doc_type).order_by(Document.created_at.desc()).all()
else:
docs = Document.query.filter_by(doc_type=doc_type, company_id=current_user.company_id).order_by(Document.created_at.desc()).all()
companies = Company.query.filter_by(active=True).all()
if current_user.is_superadmin():
clients = Client.query.filter_by(active=True).all()
products = Product.query.filter_by(active=True).all()
else:
clients = Client.query.filter_by(company_id=current_user.company_id, active=True).all()
products = Product.query.filter_by(company_id=current_user.company_id, active=True).all()
return render_template('documents.html', docs=docs, doc_type=doc_type,
companies=companies, clients=clients, products=products)
@app.route('/documents/new', methods=['POST'])
@login_required
def new_document():
d = request.get_json()
cid = int(d['company_id'])
if not current_user.can_access_company(cid): return jsonify({'error':'No autorizado'}),403
company = Company.query.get(cid)
doc_type = d.get('doc_type','invoice')
line_items = d.get('line_items',[])
subtotal = sum(i['qty']*i['price'] for i in line_items)
tax_rate = company.tax_rate if company else 7
# Tax only on items marked taxable (products/materials). Default: taxable if item_type is product or material
def is_taxable(item):
if 'taxable' in item:
return item['taxable']
return item.get('item_type','service') in ('product','material')
taxable_amt = sum(i['qty']*i['price'] for i in line_items if is_taxable(i))
tax_amount = taxable_amt*(tax_rate/100)
# Auto number — ignore any user-provided number for the internal counter
auto_number = company.get_next_number(doc_type)
# User can override display number but internal counter is untouched
display_number = d.get('number','').strip() or auto_number
doc = Document(
doc_type=doc_type, company_id=cid, client_id=int(d['client_id']),
created_by=current_user.id, number=display_number,
date=datetime.strptime(d['date'],'%Y-%m-%d').date(),
due_date=datetime.strptime(d['due_date'],'%Y-%m-%d').date() if d.get('due_date') else None,
status=d.get('status','draft'), language=d.get('language','en'),
description=d.get('description',''), line_items=json.dumps(line_items),
subtotal=subtotal, tax_rate=tax_rate, tax_amount=tax_amount,
total=subtotal+tax_amount, notes=d.get('notes',''),
prepared_by=d.get('prepared_by',''), signed_by=d.get('signed_by',''),
signature=d.get('signature','')
)
db.session.add(doc)
# Increment internal counter regardless of display number
company.increment_counter(doc_type)
db.session.commit()
return jsonify({'success':True,'id':doc.id,'auto_number':auto_number,'display_number':display_number})
@app.route('/documents/<int:id>', methods=['PUT'])
@login_required
def update_document(id):
doc = Document.query.get_or_404(id)
if not current_user.can_access_company(doc.company_id): return jsonify({'error':'No autorizado'}),403
d = request.get_json()
line_items = d.get('line_items', json.loads(doc.line_items or '[]'))
subtotal = sum(i['qty']*i['price'] for i in line_items)
company = Company.query.get(doc.company_id)
tax_rate = company.tax_rate if company else 7
def is_taxable(item):
if 'taxable' in item:
return item['taxable']
return item.get('item_type','service') in ('product','material')
taxable_amt = sum(i['qty']*i['price'] for i in line_items if is_taxable(i))
tax_amount = taxable_amt*(tax_rate/100)
doc.client_id = int(d.get('client_id', doc.client_id))
doc.number = d.get('number', doc.number)
doc.date = datetime.strptime(d['date'],'%Y-%m-%d').date() if d.get('date') else doc.date
doc.due_date = datetime.strptime(d['due_date'],'%Y-%m-%d').date() if d.get('due_date') else doc.due_date
doc.status = d.get('status', doc.status)
doc.language = d.get('language', doc.language)
doc.description = d.get('description', doc.description)
doc.line_items = json.dumps(line_items)
doc.subtotal=subtotal; doc.tax_rate=tax_rate; doc.tax_amount=tax_amount; doc.total=subtotal+tax_amount
doc.notes = d.get('notes', doc.notes)
doc.prepared_by = d.get('prepared_by', doc.prepared_by or '')
doc.signed_by = d.get('signed_by', doc.signed_by or '')
doc.signature = d.get('signature', doc.signature or '')
db.session.commit()
return jsonify({'success':True})
@app.route('/documents/<int:id>/status', methods=['POST'])
@login_required
def update_doc_status(id):
doc = Document.query.get_or_404(id)
if not current_user.can_access_company(doc.company_id): return jsonify({'error':'No autorizado'}),403
d = request.get_json()
doc.status = d['status']; db.session.commit()
return jsonify({'success':True})
@app.route('/documents/<int:id>', methods=['DELETE'])
@login_required
def delete_document(id):
doc = Document.query.get_or_404(id)
if not current_user.can_access_company(doc.company_id): return jsonify({'error':'No autorizado'}),403
# Delete stored PDF if exists
if doc.pdf_path and os.path.exists(doc.pdf_path):
os.remove(doc.pdf_path)
db.session.delete(doc); db.session.commit()
return jsonify({'success':True})
@app.route('/documents/<int:id>/data')
@login_required
def get_doc_data(id):
doc = Document.query.get_or_404(id)
if not current_user.can_access_company(doc.company_id): return jsonify({'error':'No autorizado'}),403
return jsonify({
'id':doc.id, 'doc_type':doc.doc_type, 'number':doc.number,
'company_id':doc.company_id, 'client_id':doc.client_id,
'date':doc.date.strftime('%Y-%m-%d'),
'due_date':doc.due_date.strftime('%Y-%m-%d') if doc.due_date else '',
'status':doc.status, 'language':doc.language, 'description':doc.description or '',
'line_items':json.loads(doc.line_items or '[]'),
'subtotal':doc.subtotal, 'tax_rate':doc.tax_rate,
'tax_amount':doc.tax_amount, 'total':doc.total,
'notes':doc.notes or '', 'prepared_by':doc.prepared_by or '',
'signed_by':doc.signed_by or '', 'signature':doc.signature or '',
'has_pdf': bool(doc.pdf_path and os.path.exists(doc.pdf_path))
})
# ============================================================
# PDF - SAVE ON SERVER + DOWNLOAD
# ============================================================
@app.route('/documents/<int:id>/save-pdf', methods=['POST'])
@login_required
def save_pdf(id):
"""Receive PDF as raw binary (multipart) or base64 JSON, store on server."""
doc = Document.query.get_or_404(id)
if not current_user.can_access_company(doc.company_id): return jsonify({'error':'No autorizado'}),403
company_pdf_dir = os.path.join(app.config['PDF_FOLDER'], str(doc.company_id))
os.makedirs(company_pdf_dir, exist_ok=True)
safe_number = re.sub(r'[^\w\-]','_', doc.number)
filename = f"{doc.doc_type}_{safe_number}_{doc.id}.pdf"
filepath = os.path.join(company_pdf_dir, filename)
if request.files.get('pdf'):
f = request.files['pdf']
f.save(filepath)
# Verify file was written correctly
size = os.path.getsize(filepath)
if size < 100:
return jsonify({'error': f'PDF muy pequeño ({size} bytes), posiblemente corrupto'}), 400
else:
d = request.get_json()
if not d: return jsonify({'error':'No data received'}),400
pdf_b64 = d.get('pdf_b64','').strip()
if not pdf_b64: return jsonify({'error':'No PDF data'}),400
if ',' in pdf_b64:
pdf_b64 = pdf_b64.split(',',1)[1]
pdf_b64 += '=' * (-len(pdf_b64) % 4)
try:
pdf_bytes = base64.b64decode(pdf_b64)
with open(filepath, 'wb') as wf:
wf.write(pdf_bytes)
except Exception as e:
return jsonify({'error': f'Error decodificando PDF: {str(e)}'}), 400
doc.pdf_path = filepath
db.session.commit()
return jsonify({'success':True, 'filename': filename})
@app.route('/documents/<int:id>/preview-pdf')
@login_required
def preview_pdf(id):
"""Serve PDF inline for browser preview."""
doc = Document.query.get_or_404(id)
if not current_user.can_access_company(doc.company_id): abort(403)
if not doc.pdf_path or not os.path.exists(doc.pdf_path):
return "<html><body style='font-family:sans-serif;padding:40px;background:#0a1628;color:white;'><h2>⚠️ PDF no encontrado</h2><p>Genera el PDF primero usando el botón 📄 Generar PDF</p><script>setTimeout(()=>window.close(),3000)</script></body></html>", 404
response = send_file(doc.pdf_path, as_attachment=False,
download_name=f"{doc.number}.pdf", mimetype='application/pdf')
response.headers['Content-Disposition'] = f'inline; filename="{doc.number}.pdf"'
return response
@app.route('/documents/<int:id>/download-pdf')
@login_required
def download_pdf(id):
doc = Document.query.get_or_404(id)
if not current_user.can_access_company(doc.company_id): abort(403)
if not doc.pdf_path or not os.path.exists(doc.pdf_path):
flash('PDF no encontrado. Genera el PDF primero.', 'error')
return redirect(url_for('invoices') if doc.doc_type=='invoice' else url_for('quotes'))
return send_file(doc.pdf_path, as_attachment=True,
download_name=f"{doc.number}.pdf", mimetype='application/pdf')
# ============================================================
# EMAIL PDF
# ============================================================
@app.route('/documents/<int:id>/send-email', methods=['POST'])
@login_required
def send_email_pdf(id):
doc = Document.query.get_or_404(id)
if not current_user.can_access_company(doc.company_id): return jsonify({'error':'No autorizado'}),403
if not doc.pdf_path or not os.path.exists(doc.pdf_path):
return jsonify({'error':'PDF no encontrado. Genera el PDF primero.'}),400
company = Company.query.get(doc.company_id)
# SMTP server from company, credentials from logged-in user (fallback to company)
smtp_host = company.smtp_host
smtp_port = company.smtp_port or 587
smtp_user = current_user.smtp_user or company.smtp_user
smtp_pass = current_user.smtp_password or company.smtp_password
from_name = current_user.email_title or current_user.full_name or company.smtp_from_name or company.name
if not smtp_host or not smtp_user:
return jsonify({'error':'Configura tu email en tu perfil o el SMTP en la compañía primero.'}),400
d = request.get_json()
to_email = d.get('to_email','').strip()
if not to_email: return jsonify({'error':'Email del destinatario requerido.'}),400
subject = d.get('subject', f"{doc.number} - {company.name}")
body = d.get('body', f"Please find attached {doc.doc_type} {doc.number}.\n\nThank you for your business.\n\n{company.name}")
try:
msg = MIMEMultipart()
msg['From'] = f"{from_name} <{smtp_user}>"
msg['To'] = to_email
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
with open(doc.pdf_path,'rb') as f:
part = MIMEBase('application','octet-stream')
part.set_payload(f.read())
encoders.encode_base64(part)
part.add_header('Content-Disposition', f'attachment; filename="{doc.number}.pdf"')
msg.attach(part)
with smtplib.SMTP(smtp_host, smtp_port) as server:
server.starttls()
server.login(smtp_user, smtp_pass)
# Send to client + BCC to sender for their own record
server.sendmail(smtp_user, [to_email, smtp_user], msg.as_string())
if doc.status == 'draft':
doc.status = 'sent'
db.session.commit()
return jsonify({'success': True})
except Exception as e:
return jsonify({'error': f'Error enviando email: {str(e)}'}),500
# ============================================================
# API
# ============================================================
@app.route('/api/clients/<int:company_id>')
@login_required
def api_clients(company_id):
clients = Client.query.filter_by(company_id=company_id, active=True).all()
return jsonify([{'id':c.id,'name':c.name,'yacht':c.yacht_name,'email':c.email,
'phone':c.phone,'address':c.address,'city':c.city,'state':c.state,
'contact':c.contact,'yacht_info':c.yacht_info} for c in clients])
@app.route('/api/products/<int:company_id>')
@login_required
def api_products(company_id):
prods = Product.query.filter_by(company_id=company_id, active=True).all()
return jsonify([{'id':p.id,'name':p.name,'price':p.price,'unit':p.unit,'desc':p.description} for p in prods])
@app.route('/api/next-number/<int:company_id>/<doc_type>')
@login_required
def api_next_number(company_id, doc_type):
company = Company.query.get_or_404(company_id)
return jsonify({'number': company.get_next_number(doc_type)})
@app.route('/api/company/<int:company_id>')
@login_required
def api_company(company_id):
c = Company.query.get_or_404(company_id)
return jsonify({
'id':c.id,'name':c.name,'ein':c.ein,'license_num':c.license_num,
'phone':c.phone,'address':c.address,'city':c.city,'state':c.state,
'email':c.email,'website':c.website,'manager':c.manager,'authorized':c.authorized,
'tax_rate':c.tax_rate,
'notes':c.notes,
'invoice_notes': c.invoice_notes or c.notes or '',
'quote_notes': c.quote_notes or c.notes or '',
'logo_path':c.logo_path or '',
'signature_path': c.signature_path or '',
'invoice_prefix':c.invoice_prefix,'quote_prefix':c.quote_prefix
})
@app.route('/api/me')
@login_required
def api_me():
"""Retorna info del usuario loggeado incluyendo su firma"""
return jsonify({
'id': current_user.id,
'full_name': current_user.full_name or current_user.username,
'username': current_user.username,
'smtp_user': current_user.smtp_user or '',
'signature': current_user.signature or ''
})
@app.route('/api/me/signature', methods=['POST'])
@login_required
def save_my_signature():
"""Guarda la firma del usuario loggeado"""
d = request.get_json()
sig = d.get('signature','')
if not sig:
return jsonify({'error':'No signature data'}), 400
current_user.signature = sig
db.session.commit()
return jsonify({'success': True})
@app.route('/api/users/<int:company_id>')
@login_required
def api_company_users(company_id):
"""Lista usuarios activos de una compañía para el dropdown de 'Autorizado por'"""
if not current_user.can_access_company(company_id):
return jsonify({'error':'No autorizado'}), 403
users = User.query.filter_by(company_id=company_id, active=True).all()
# Superadmin también aparece
admins = User.query.filter_by(role='superadmin', active=True).all()
all_users = {u.id: u for u in users + admins}
return jsonify([{
'id': u.id,
'full_name': u.full_name or u.username,
'username': u.username
} for u in all_users.values()])
# ============================================================
# AI TRANSLATION (Open WebUI bridge)
# ============================================================
OPENWEBUI_URL = 'http://localhost:11434/api/chat'
OPENWEBUI_MODEL = 'llama3.1:8b'
OPENWEBUI_KEY = '' # Ollama directo no requiere key
def _call_ollama(text):
if not text or not text.strip():
return text
resp = http_requests.post(
OPENWEBUI_URL,
json={
'model': OPENWEBUI_MODEL,
'messages': [
{'role': 'system', 'content': 'You are a professional marine industry translator. Translate the user message to English. Return ONLY the translated text. No quotes, no explanations.'},
{'role': 'user', 'content': text}
],
'stream': False
},
timeout=45
)
resp.raise_for_status()
return resp.json()['message']['content'].strip()
@app.route('/api/translate', methods=['POST'])
@login_required
def translate_text():
if not HAS_REQUESTS:
return jsonify({'translated': []})
data = request.get_json()
texts = data.get('texts', [])
result = []
for text in texts:
try:
result.append(_call_ollama(text))
except Exception as e:
app.logger.warning(f'Translation error: {e}')
result.append(text)
return jsonify({'translated': result})
# ============================================================
# STRIPE PAYMENT
# ============================================================
def get_or_create_payment_token(doc):
if not doc.payment_token:
doc.payment_token = secrets.token_hex(32)
db.session.commit()
return doc.payment_token
def generate_qr_base64(url):
if not HAS_QR:
return None
img = qrcode.make(url)
buf = io.BytesIO()
img.save(buf, format='PNG')
return base64.b64encode(buf.getvalue()).decode()
@app.route('/api/invoice/<int:invoice_id>/payment-link', methods=['POST'])
@login_required
def get_payment_link(invoice_id):
doc = Document.query.get_or_404(invoice_id)
if doc.doc_type != 'invoice':
return jsonify({'error': 'Solo invoices'}), 400
if not current_user.can_access_company(doc.company_id):
return jsonify({'error': 'No autorizado'}), 403
company = Company.query.get(doc.company_id)
if not company or not company.stripe_secret_key:
return jsonify({'has_stripe': False, 'payment_url': None, 'qr': None})
token = get_or_create_payment_token(doc)
base = get_public_base_url()
payment_url = f'{base}/pay/{token}'
qr = generate_qr_base64(payment_url)
return jsonify({'has_stripe': True, 'payment_url': payment_url, 'qr': qr})
@app.route('/pay/<token>')
def public_pay(token):
doc = Document.query.filter_by(payment_token=token).first_or_404()
if doc.doc_type != 'invoice':
return 'Link inválido', 404
if doc.status == 'paid':
return render_template('pay_success.html', doc=doc, already_paid=True)
company = Company.query.get(doc.company_id)
client = doc.client
# Traducir descripción al inglés para la página pública
description_en = doc.description or ''
if description_en and HAS_REQUESTS:
try:
description_en = _call_ollama(description_en)
except Exception:
pass
return render_template('pay_page.html', doc=doc, company=company, client=client,
token=token, description_en=description_en)
@app.route('/pay/<token>/checkout', methods=['POST'])
def stripe_checkout(token):
if not stripe:
return 'Stripe no instalado', 500
doc = Document.query.filter_by(payment_token=token).first_or_404()
if doc.status == 'paid':
return redirect(f'/pay/{token}')
company = Company.query.get(doc.company_id)
if not company or not company.stripe_secret_key:
return 'Stripe no configurado', 400
invoice_cents = int(round(doc.total * 100))
fee_cents = int(round((doc.total * 0.029 + 0.30) * 100))
base = get_public_base_url()
try:
# Pasar api_key por llamada en lugar de asignar el global (thread-safe)
session = stripe.checkout.Session.create(
payment_method_types=['card'],
line_items=[
{
'price_data': {
'currency': 'usd',
'unit_amount': invoice_cents,
'product_data': {'name': f'Invoice {doc.number}{company.name}'},
},
'quantity': 1,
},
{
'price_data': {
'currency': 'usd',
'unit_amount': fee_cents,
'product_data': {'name': 'Credit card processing fee (2.9% + $0.30)'},
},
'quantity': 1,
},
],
mode='payment',
success_url=f'{base}/pay/{token}/success?session_id={{CHECKOUT_SESSION_ID}}',
cancel_url=f'{base}/pay/{token}',
metadata={'invoice_id': doc.id, 'token': token},
api_key=company.stripe_secret_key, # per-request, thread-safe
)
return redirect(session.url)
except Exception as e:
return f'Error Stripe: {e}', 500
@app.route('/pay/<token>/success')
def pay_success(token):
doc = Document.query.filter_by(payment_token=token).first_or_404()
company = Company.query.get(doc.company_id)
session_id = request.args.get('session_id', '')
if doc.status != 'paid':
# Verificar con Stripe antes de marcar como pagado
if stripe and company and company.stripe_secret_key and session_id:
try:
sess = stripe.checkout.Session.retrieve(
session_id, api_key=company.stripe_secret_key)
if sess.payment_status == 'paid':
doc.status = 'paid'
db.session.commit()
except Exception:
pass # El webhook es el canal confiable; aquí solo confirmamos
already_paid = (doc.status == 'paid')
return render_template('pay_success.html', doc=doc, company=company, already_paid=already_paid)
@app.route('/stripe/webhook', methods=['POST'])
def stripe_webhook():
payload = request.data
sig = request.headers.get('Stripe-Signature', '')
webhook_secret = os.environ.get('STRIPE_WEBHOOK_SECRET', '')
if webhook_secret:
try:
event = stripe.Webhook.construct_event(payload, sig, webhook_secret)
if event['type'] == 'checkout.session.completed':
token = event['data']['object'].get('metadata', {}).get('token')
if token:
doc = Document.query.filter_by(payment_token=token).first()
if doc:
doc.status = 'paid'
db.session.commit()
except Exception:
return jsonify({'status': 'error'}), 400
return jsonify({'status': 'ok'})
# ============================================================
# INIT
# ============================================================
def init_db():
with app.app_context():
db.create_all()
if not User.query.filter_by(username='admin').first():
admin = User(username='admin', email='admin@marineinvoice.com',
full_name='Super Admin', role='superadmin')
admin.set_password('admin123')
db.session.add(admin); db.session.commit()
print('✅ Admin creado: usuario=admin, contraseña=admin123')
print('⚠️ Cambia la contraseña después del primer login!')
if __name__ == '__main__':
init_db()
print('🚀 MarineInvoice Pro corriendo en http://localhost:5000')
print('📱 Desde Tailscale: http://100.96.43.86:5000')
app.run(host='0.0.0.0', port=5000, debug=False)