diff --git a/app.py b/app.py index 41813d2..9dd6078 100644 --- a/app.py +++ b/app.py @@ -4,7 +4,7 @@ from flask_login import LoginManager, UserMixin, login_user, logout_user, login_ from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.utils import secure_filename from datetime import datetime -import os, json, smtplib, re, secrets +import os, json, smtplib, re, secrets, time try: import requests as http_requests HAS_REQUESTS = True @@ -40,13 +40,44 @@ from email.mime.text import MIMEText from email import encoders app = Flask(__name__) -app.config['SECRET_KEY'] = 'marineinvoice-secret-key-2024' -app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///marineinvoice.db' + +# ── Security: SECRET_KEY desde variable de entorno ─────────────────────────── +_secret_key = os.environ.get('SECRET_KEY') +if not _secret_key: + _secret_key = 'marineinvoice-secret-key-2024' + print('⚠️ WARNING: SECRET_KEY no configurado en variables de entorno. ' + 'Crea un archivo .env con SECRET_KEY= antes de producción.') +app.config['SECRET_KEY'] = _secret_key + +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{os.path.join(BASE_DIR, "instance", "marineinvoice.db")}' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False -app.config['UPLOAD_FOLDER'] = 'static/logos' -app.config['PDF_FOLDER'] = 'static/pdfs' -os.makedirs('static/logos', exist_ok=True) -os.makedirs('static/pdfs', exist_ok=True) +app.config['UPLOAD_FOLDER'] = os.path.join(BASE_DIR, 'static', 'logos') +app.config['PDF_FOLDER'] = os.path.join(BASE_DIR, 'static', 'pdfs') +app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16 MB máximo por request +os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) +os.makedirs(app.config['PDF_FOLDER'], exist_ok=True) +os.makedirs(os.path.join(BASE_DIR, 'instance'), exist_ok=True) + +# Extensiones permitidas en subidas de archivos +ALLOWED_IMG_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'} + +def _allowed_image(filename): + return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_IMG_EXTENSIONS + +# ── Rate-limiting simple para login (sin dependencias externas) ────────────── +_login_attempts: dict = {} # ip -> [timestamps] +_LOGIN_MAX = 10 # intentos máximos +_LOGIN_WINDOW = 900 # ventana de 15 minutos + +def _is_rate_limited(ip: str) -> bool: + now = time.time() + attempts = [t for t in _login_attempts.get(ip, []) if now - t < _LOGIN_WINDOW] + _login_attempts[ip] = attempts + return len(attempts) >= _LOGIN_MAX + +def _record_failed_login(ip: str): + _login_attempts.setdefault(ip, []).append(time.time()) db = SQLAlchemy(app) login_manager = LoginManager(app) @@ -203,7 +234,7 @@ class Document(db.Model): creator = db.relationship('User', backref='documents', lazy=True, foreign_keys=[created_by]) @login_manager.user_loader -def load_user(user_id): return User.query.get(int(user_id)) +def load_user(user_id): return db.session.get(User, int(user_id)) # ============================================================ # AUTH @@ -212,12 +243,17 @@ def load_user(user_id): return User.query.get(int(user_id)) def login(): if current_user.is_authenticated: return redirect(url_for('dashboard')) if request.method == 'POST': + ip = request.remote_addr or '0.0.0.0' + if _is_rate_limited(ip): + flash('Demasiados intentos fallidos. Espera 15 minutos.', 'error') + return render_template('login.html') u = request.form.get('username','').strip() p = request.form.get('password','') user = User.query.filter_by(username=u, active=True).first() if user and user.check_password(p): login_user(user, remember=True) return redirect(url_for('dashboard')) + _record_failed_login(ip) flash('Usuario o contraseña incorrectos', 'error') return render_template('login.html') @@ -273,6 +309,9 @@ def new_company(): if 'logo' in request.files: f = request.files['logo'] if f and f.filename: + if not _allowed_image(f.filename): + flash('Formato de imagen no permitido (usa PNG, JPG, GIF, WEBP)', 'error') + return render_template('company_form.html', company=None) fn = secure_filename(f.filename) f.save(os.path.join(app.config['UPLOAD_FOLDER'], fn)) logo_path = f'logos/{fn}' @@ -313,12 +352,18 @@ def edit_company(id): if 'logo' in request.files: f = request.files['logo'] if f and f.filename: + if not _allowed_image(f.filename): + flash('Formato de imagen no permitido (usa PNG, JPG, GIF, WEBP)', 'error') + return render_template('company_form.html', company=c) fn = secure_filename(f.filename) f.save(os.path.join(app.config['UPLOAD_FOLDER'], fn)) c.logo_path = f'logos/{fn}' if 'signature' in request.files: f = request.files['signature'] if f and f.filename: + if not _allowed_image(f.filename): + flash('Formato de imagen no permitido (usa PNG, JPG, GIF, WEBP)', 'error') + return render_template('company_form.html', company=c) fn = 'sig_' + secure_filename(f.filename) f.save(os.path.join(app.config['UPLOAD_FOLDER'], fn)) c.signature_path = f'logos/{fn}' @@ -542,8 +587,12 @@ def _doc_list_page(doc_type): else: docs = Document.query.filter_by(doc_type=doc_type, company_id=current_user.company_id).order_by(Document.created_at.desc()).all() companies = Company.query.filter_by(active=True).all() - clients = Client.query.filter_by(active=True).all() - products = Product.query.filter_by(active=True).all() + if current_user.is_superadmin(): + clients = Client.query.filter_by(active=True).all() + products = Product.query.filter_by(active=True).all() + else: + clients = Client.query.filter_by(company_id=current_user.company_id, active=True).all() + products = Product.query.filter_by(company_id=current_user.company_id, active=True).all() return render_template('documents.html', docs=docs, doc_type=doc_type, companies=companies, clients=clients, products=products) @@ -962,11 +1011,11 @@ def stripe_checkout(token): company = Company.query.get(doc.company_id) if not company or not company.stripe_secret_key: return 'Stripe no configurado', 400 - stripe.api_key = company.stripe_secret_key invoice_cents = int(round(doc.total * 100)) fee_cents = int(round((doc.total * 0.029 + 0.30) * 100)) base = get_public_base_url() try: + # Pasar api_key por llamada en lugar de asignar el global (thread-safe) session = stripe.checkout.Session.create( payment_method_types=['card'], line_items=[ @@ -990,7 +1039,8 @@ def stripe_checkout(token): mode='payment', success_url=f'{base}/pay/{token}/success?session_id={{CHECKOUT_SESSION_ID}}', cancel_url=f'{base}/pay/{token}', - metadata={'invoice_id': doc.id, 'token': token} + metadata={'invoice_id': doc.id, 'token': token}, + api_key=company.stripe_secret_key, # per-request, thread-safe ) return redirect(session.url) except Exception as e: @@ -999,10 +1049,21 @@ def stripe_checkout(token): @app.route('/pay//success') def pay_success(token): doc = Document.query.filter_by(payment_token=token).first_or_404() - doc.status = 'paid' - db.session.commit() company = Company.query.get(doc.company_id) - return render_template('pay_success.html', doc=doc, company=company, already_paid=False) + session_id = request.args.get('session_id', '') + if doc.status != 'paid': + # Verificar con Stripe antes de marcar como pagado + if stripe and company and company.stripe_secret_key and session_id: + try: + sess = stripe.checkout.Session.retrieve( + session_id, api_key=company.stripe_secret_key) + if sess.payment_status == 'paid': + doc.status = 'paid' + db.session.commit() + except Exception: + pass # El webhook es el canal confiable; aquí solo confirmamos + already_paid = (doc.status == 'paid') + return render_template('pay_success.html', doc=doc, company=company, already_paid=already_paid) @app.route('/stripe/webhook', methods=['POST']) def stripe_webhook():