195 lines
7.1 KiB
Python
195 lines
7.1 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
|
from sqlalchemy.orm import Session
|
|
from jose import JWTError, jwt
|
|
from passlib.context import CryptContext
|
|
from datetime import datetime, timedelta
|
|
from pydantic import BaseModel
|
|
from typing import Optional
|
|
import uuid
|
|
import os
|
|
|
|
from database import get_db, SessionLocal
|
|
from models.user import User, Role
|
|
|
|
SECRET_KEY = os.getenv("SECRET_KEY")
|
|
if not SECRET_KEY:
|
|
raise RuntimeError("SECRET_KEY no configurado — crea backend/.env con SECRET_KEY=<valor>")
|
|
ALGORITHM = "HS256"
|
|
TOKEN_HOURS = 8
|
|
|
|
pwd_ctx = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
|
|
router = APIRouter(prefix="/auth", tags=["auth"])
|
|
|
|
# ── Helpers ────────────────────────────────────────────────────────────────
|
|
|
|
def hash_password(plain: str) -> str:
|
|
return pwd_ctx.hash(plain)
|
|
|
|
def verify_password(plain: str, hashed: str) -> bool:
|
|
return pwd_ctx.verify(plain, hashed)
|
|
|
|
def create_token(data: dict) -> str:
|
|
payload = data.copy()
|
|
payload["exp"] = datetime.utcnow() + timedelta(hours=TOKEN_HOURS)
|
|
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
|
|
|
|
def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> User:
|
|
exc = HTTPException(status_code=401, detail="Invalid or expired token",
|
|
headers={"WWW-Authenticate": "Bearer"})
|
|
try:
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
username = payload.get("sub")
|
|
if not username:
|
|
raise exc
|
|
except JWTError:
|
|
raise exc
|
|
user = db.query(User).filter(User.username == username, User.activo == True).first()
|
|
if not user:
|
|
raise exc
|
|
return user
|
|
|
|
def require_admin(user: User = Depends(get_current_user)):
|
|
if user.role not in (Role.ADMIN, Role.SUPERADMIN):
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
return user
|
|
|
|
def require_superadmin(user: User = Depends(get_current_user)):
|
|
if user.role != Role.SUPERADMIN:
|
|
raise HTTPException(status_code=403, detail="Superadmin access required")
|
|
return user
|
|
|
|
# ── Seed usuario inicial ───────────────────────────────────────────────────
|
|
|
|
def seed_users():
|
|
db = SessionLocal()
|
|
if not db.query(User).first():
|
|
db.add(User(
|
|
id=str(uuid.uuid4()),
|
|
username="admin",
|
|
nombre="Administrador",
|
|
email="admin@aidsmonitoring.com",
|
|
hashed_pw=hash_password("admin123"),
|
|
role=Role.SUPERADMIN,
|
|
))
|
|
db.add(User(
|
|
id=str(uuid.uuid4()),
|
|
username="operador",
|
|
nombre="Operador Barranquilla",
|
|
email="operador@aidsmonitoring.com",
|
|
hashed_pw=hash_password("operador123"),
|
|
role=Role.ADMIN,
|
|
))
|
|
db.add(User(
|
|
id=str(uuid.uuid4()),
|
|
username="visor",
|
|
nombre="Usuario Solo Lectura",
|
|
email="visor@aidsmonitoring.com",
|
|
hashed_pw=hash_password("visor123"),
|
|
role=Role.USER,
|
|
))
|
|
db.commit()
|
|
db.close()
|
|
|
|
# ── Endpoints ──────────────────────────────────────────────────────────────
|
|
|
|
class TokenResponse(BaseModel):
|
|
access_token: str
|
|
token_type: str
|
|
username: str
|
|
nombre: str
|
|
role: str
|
|
|
|
class UserCreate(BaseModel):
|
|
username: str
|
|
nombre: str
|
|
email: Optional[str] = None
|
|
password: str
|
|
role: str = "USER"
|
|
company_id: Optional[str] = None
|
|
|
|
@router.post("/login", response_model=TokenResponse)
|
|
def login(form: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
|
|
user = db.query(User).filter(User.username == form.username, User.activo == True).first()
|
|
if not user or not verify_password(form.password, user.hashed_pw):
|
|
raise HTTPException(status_code=401, detail="Invalid credentials")
|
|
user.ultimo_login = datetime.utcnow()
|
|
db.commit()
|
|
token = create_token({"sub": user.username, "role": user.role})
|
|
return TokenResponse(
|
|
access_token=token, token_type="bearer",
|
|
username=user.username, nombre=user.nombre, role=user.role,
|
|
)
|
|
|
|
@router.get("/me")
|
|
def me(user: User = Depends(get_current_user)):
|
|
return {
|
|
"username": user.username,
|
|
"nombre": user.nombre,
|
|
"role": user.role,
|
|
"company_id": getattr(user, "company_id", None),
|
|
}
|
|
|
|
@router.get("/users", dependencies=[Depends(require_superadmin)])
|
|
def list_users(db: Session = Depends(get_db)):
|
|
users = db.query(User).all()
|
|
return [
|
|
{
|
|
"id": u.id, "username": u.username, "nombre": u.nombre,
|
|
"email": u.email, "role": u.role, "activo": u.activo,
|
|
"company_id": getattr(u, "company_id", None),
|
|
"ultimo_login": u.ultimo_login.isoformat() if u.ultimo_login else None,
|
|
}
|
|
for u in users
|
|
]
|
|
|
|
@router.post("/users", dependencies=[Depends(require_superadmin)])
|
|
def create_user(data: UserCreate, db: Session = Depends(get_db)):
|
|
if db.query(User).filter(User.username == data.username).first():
|
|
raise HTTPException(status_code=400, detail="Username already exists")
|
|
user = User(
|
|
id=str(uuid.uuid4()),
|
|
username=data.username,
|
|
nombre=data.nombre,
|
|
email=data.email,
|
|
hashed_pw=hash_password(data.password),
|
|
role=data.role,
|
|
company_id=data.company_id,
|
|
)
|
|
db.add(user)
|
|
db.commit()
|
|
return {"ok": True, "username": user.username, "role": user.role,
|
|
"company_id": user.company_id}
|
|
|
|
@router.put("/users/{username}", dependencies=[Depends(require_superadmin)])
|
|
def update_user(username: str, data: dict, db: Session = Depends(get_db)):
|
|
user = db.query(User).filter(User.username == username).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
if "nombre" in data:
|
|
user.nombre = data["nombre"]
|
|
if "email" in data:
|
|
user.email = data["email"]
|
|
if "role" in data:
|
|
user.role = data["role"]
|
|
if "activo" in data:
|
|
user.activo = data["activo"]
|
|
if "password" in data and data["password"]:
|
|
user.hashed_pw = hash_password(data["password"])
|
|
if "company_id" in data:
|
|
user.company_id = data["company_id"] or None
|
|
db.commit()
|
|
return {"ok": True}
|
|
|
|
@router.delete("/users/{username}", dependencies=[Depends(require_superadmin)])
|
|
def delete_user(username: str, db: Session = Depends(get_db)):
|
|
if username == "admin":
|
|
raise HTTPException(status_code=400, detail="Cannot delete admin")
|
|
user = db.query(User).filter(User.username == username).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
db.delete(user)
|
|
db.commit()
|
|
return {"ok": True}
|