""" EmailManager — Gmail cleanup & organization Accounts: alro65@gmail.com, alro65usa@gmail.com """ import os import sys import base64 import re import json import pickle import time import urllib.request sys.stdout.reconfigure(encoding='utf-8') from datetime import datetime from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from googleapiclient.errors import HttpError SCOPES = [ 'https://www.googleapis.com/auth/gmail.modify', 'https://www.googleapis.com/auth/gmail.labels', ] CREDENTIALS_FILE = os.path.join(os.path.dirname(__file__), 'credentials.json') # ── Label structure ──────────────────────────────────────────────────────────── LABELS_TO_CREATE = [ "Bancos/Extractos", "Bancos/Promo", "Trabajo", "AutoBooking", "Recibos", "Newsletters", ] # ── Bank senders (promo goes to Bancos/Promo, statements to Bancos/Extractos) ─ BANK_DOMAINS = [ 'bancolombia', 'davivienda', 'bbva', 'scotiabank', 'citibank', 'hsbc', 'santander', 'chase', 'wellsfargo', 'bankofamerica', 'capitalone', 'discover', 'americanexpress', 'amex', 'nequi', 'daviplata', 'bold', 'bancodeoccidente', 'bancopopular', 'coopcentral', 'ing.', 'paypal', 'stripe', ] BANK_STATEMENT_KEYWORDS = [ 'estado de cuenta', 'extracto', 'resumen de cuenta', 'account statement', 'transaction alert', 'alerta de transacción', 'compra realizada', 'pago recibido', 'transferencia', 'your statement', ] # ── Spam/promo keywords in subject or sender ────────────────────────────────── SPAM_KEYWORDS = [ 'insurance quote', 'cotización de seguro', 'auto insurance', 'car insurance', 'life insurance', 'health insurance', 'get a quote', 'free quote', 'save on insurance', 'compare rates', 'lowest rates', 'best rates', 'final notice', 'last chance', 'act now', 'limited time', 'you\'ve been selected', 'congratulations you won', 'unclaimed reward', 'claim your prize', ] PROMO_CATEGORIES = ['CATEGORY_PROMOTIONS', 'CATEGORY_UPDATES'] def authenticate(account_name: str) -> object: token_file = os.path.join(os.path.dirname(__file__), f'token_{account_name}.pickle') creds = None if os.path.exists(token_file): with open(token_file, 'rb') as f: creds = pickle.load(f) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_FILE, SCOPES) creds = flow.run_local_server(port=0) with open(token_file, 'wb') as f: pickle.dump(creds, f) return build('gmail', 'v1', credentials=creds) def get_or_create_label(service, name: str, label_cache: dict) -> str: if name in label_cache: return label_cache[name] labels = service.users().labels().list(userId='me').execute().get('labels', []) for lbl in labels: if lbl['name'].lower() == name.lower(): label_cache[name] = lbl['id'] return lbl['id'] # Create nested labels parent-first parts = name.split('/') for i in range(1, len(parts) + 1): partial = '/'.join(parts[:i]) if partial not in label_cache: exists = next((l for l in labels if l['name'].lower() == partial.lower()), None) if exists: label_cache[partial] = exists['id'] else: body = { 'name': partial, 'labelListVisibility': 'labelShow', 'messageListVisibility': 'show', } new = service.users().labels().create(userId='me', body=body).execute() label_cache[partial] = new['id'] print(f" [+] Label creado: {partial}") return label_cache[name] def get_header(headers: list, name: str) -> str: for h in headers: if h['name'].lower() == name.lower(): return h['value'] return '' def extract_unsubscribe_url(headers: list) -> str | None: raw = get_header(headers, 'List-Unsubscribe') if not raw: return None # Prefer HTTPS link over mailto urls = re.findall(r'<(https?://[^>]+)>', raw) return urls[0] if urls else None def try_unsubscribe(url: str) -> bool: # Guard: only follow HTTPS unsubscribe links to avoid HTTP downgrade / SSRF if not url.startswith('https://'): return False try: req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) urllib.request.urlopen(req, timeout=10) return True except Exception: return False def is_bank_email(sender: str, subject: str) -> tuple[bool, bool]: """Returns (is_bank, is_statement).""" combined = (sender + ' ' + subject).lower() if not any(b in combined for b in BANK_DOMAINS): return False, False is_statement = any(k in combined for k in BANK_STATEMENT_KEYWORDS) return True, is_statement def is_spam(sender: str, subject: str) -> bool: combined = (sender + ' ' + subject).lower() return any(k in combined for k in SPAM_KEYWORDS) def process_account(account_name: str): print(f"\n{'='*60}") print(f" Procesando: {account_name}") print(f"{'='*60}") service = authenticate(account_name) label_cache = {} # Ensure all labels exist print("\n[1] Creando estructura de labels...") for lbl in LABELS_TO_CREATE: get_or_create_label(service, lbl, label_cache) stats = { 'unsubscribed': 0, 'deleted_spam': 0, 'archived_bank_promo': 0, 'archived_bank_statement': 0, 'errors': 0, } # ── Pass 1: Promotions category ──────────────────────────────────────────── print("\n[2] Analizando correos promocionales...") query = 'category:promotions OR category:updates' page_token = None while True: try: kwargs = {'userId': 'me', 'q': query, 'maxResults': 500} if page_token: kwargs['pageToken'] = page_token result = service.users().messages().list(**kwargs).execute() except HttpError as e: print(f" Error listando mensajes: {e}") break messages = result.get('messages', []) if not messages: break print(f" Encontrados {len(messages)} mensajes en esta página...") for msg_ref in messages: try: msg = service.users().messages().get( userId='me', id=msg_ref['id'], format='metadata', metadataHeaders=['From', 'Subject', 'List-Unsubscribe'] ).execute() headers = msg.get('payload', {}).get('headers', []) sender = get_header(headers, 'From') subject = get_header(headers, 'Subject') labels = msg.get('labelIds', []) is_bank, is_statement = is_bank_email(sender, subject) if is_bank: if is_statement: lbl_id = get_or_create_label(service, 'Bancos/Extractos', label_cache) action = 'archive_statement' else: lbl_id = get_or_create_label(service, 'Bancos/Promo', label_cache) action = 'archive_bank_promo' service.users().messages().modify( userId='me', id=msg_ref['id'], body={'addLabelIds': [lbl_id], 'removeLabelIds': ['INBOX']} ).execute() if action == 'archive_statement': stats['archived_bank_statement'] += 1 else: stats['archived_bank_promo'] += 1 else: # Non-bank promo → unsubscribe + delete unsub_url = extract_unsubscribe_url(headers) if unsub_url: ok = try_unsubscribe(unsub_url) if ok: stats['unsubscribed'] += 1 print(f" [OK] Unsubscribe: {sender[:60]}") service.users().messages().trash(userId='me', id=msg_ref['id']).execute() stats['deleted_spam'] += 1 except HttpError as e: stats['errors'] += 1 if e.resp.status == 429: print(" Rate limit — esperando 5s...") time.sleep(5) page_token = result.get('nextPageToken') if not page_token: break # ── Pass 2: Explicit spam keywords in inbox ──────────────────────────────── print("\n[3] Buscando spam por keywords en inbox...") spam_query = ' OR '.join(f'subject:"{k}"' for k in SPAM_KEYWORDS[:8]) # Gmail query limit try: result = service.users().messages().list( userId='me', q=f'in:inbox ({spam_query})', maxResults=500 ).execute() messages = result.get('messages', []) print(f" Encontrados {len(messages)} mensajes spam por keyword...") for msg_ref in messages: try: msg = service.users().messages().get( userId='me', id=msg_ref['id'], format='metadata', metadataHeaders=['From', 'Subject', 'List-Unsubscribe'] ).execute() headers = msg.get('payload', {}).get('headers', []) sender = get_header(headers, 'From') subject = get_header(headers, 'Subject') unsub_url = extract_unsubscribe_url(headers) if unsub_url: ok = try_unsubscribe(unsub_url) if ok: stats['unsubscribed'] += 1 service.users().messages().trash(userId='me', id=msg_ref['id']).execute() stats['deleted_spam'] += 1 except HttpError: stats['errors'] += 1 except HttpError as e: print(f" Error en búsqueda spam: {e}") # ── Summary ──────────────────────────────────────────────────────────────── print(f"\n{'─'*40}") print(f" RESUMEN — {account_name}") print(f" Unsubscribes realizados : {stats['unsubscribed']}") print(f" Correos eliminados : {stats['deleted_spam']}") print(f" Bancos/Promo : {stats['archived_bank_promo']}") print(f" Bancos/Extractos : {stats['archived_bank_statement']}") print(f" Errores : {stats['errors']}") print(f"{'─'*40}") return stats def main(): accounts = ['alro65', 'alro65usa'] # token files: token_alro65.pickle, token_alro65usa.pickle print("=" * 40) print(" EmailManager -- Limpieza Gmail") print("=" * 40) print("\nSe procesaran:") print(" - alro65@gmail.com") print(" - alro65usa@gmail.com") print("\nPara cada cuenta se abrirá el navegador para autenticación OAuth.") print("Usa ventana incógnito si es necesario.\n") input("Presiona ENTER para comenzar...") total_deleted = 0 total_unsub = 0 for acc in accounts: stats = process_account(acc) total_deleted += stats['deleted_spam'] total_unsub += stats['unsubscribed'] print(f"\n{'='*40}") print(f" TOTAL GENERAL") print(f" Unsubscribes : {total_unsub}") print(f" Eliminados : {total_deleted}") print(f"{'='*40}") print("\nListo. Vacía la papelera en Gmail para liberar espacio.") if __name__ == '__main__': main()