Files
EmailManager/email_manager.py

343 lines
12 KiB
Python

"""
EmailManager — Gmail cleanup & organization
Accounts: alro65@gmail.com, alro65usa@gmail.com
"""
import os
import sys
import base64
import re
import json
import pickle
import time
import urllib.request
sys.stdout.reconfigure(encoding='utf-8')
from datetime import datetime
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
SCOPES = [
'https://www.googleapis.com/auth/gmail.modify',
'https://www.googleapis.com/auth/gmail.labels',
]
CREDENTIALS_FILE = os.path.join(os.path.dirname(__file__), 'credentials.json')
# ── Label structure ────────────────────────────────────────────────────────────
LABELS_TO_CREATE = [
"Bancos/Extractos",
"Bancos/Promo",
"Trabajo",
"AutoBooking",
"Recibos",
"Newsletters",
]
# ── Bank senders (promo goes to Bancos/Promo, statements to Bancos/Extractos) ─
BANK_DOMAINS = [
'bancolombia', 'davivienda', 'bbva', 'scotiabank', 'citibank',
'hsbc', 'santander', 'chase', 'wellsfargo', 'bankofamerica',
'capitalone', 'discover', 'americanexpress', 'amex', 'nequi',
'daviplata', 'bold', 'bancodeoccidente', 'bancopopular',
'coopcentral', 'ing.', 'paypal', 'stripe',
]
BANK_STATEMENT_KEYWORDS = [
'estado de cuenta', 'extracto', 'resumen de cuenta', 'account statement',
'transaction alert', 'alerta de transacción', 'compra realizada',
'pago recibido', 'transferencia', 'your statement',
]
# ── Spam/promo keywords in subject or sender ──────────────────────────────────
SPAM_KEYWORDS = [
'insurance quote', 'cotización de seguro', 'auto insurance',
'car insurance', 'life insurance', 'health insurance',
'get a quote', 'free quote', 'save on insurance',
'compare rates', 'lowest rates', 'best rates',
'final notice', 'last chance', 'act now', 'limited time',
'you\'ve been selected', 'congratulations you won',
'unclaimed reward', 'claim your prize',
]
PROMO_CATEGORIES = ['CATEGORY_PROMOTIONS', 'CATEGORY_UPDATES']
def authenticate(account_name: str) -> object:
token_file = os.path.join(os.path.dirname(__file__), f'token_{account_name}.pickle')
creds = None
if os.path.exists(token_file):
with open(token_file, 'rb') as f:
creds = pickle.load(f)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_FILE, SCOPES)
creds = flow.run_local_server(port=0)
with open(token_file, 'wb') as f:
pickle.dump(creds, f)
return build('gmail', 'v1', credentials=creds)
def get_or_create_label(service, name: str, label_cache: dict) -> str:
if name in label_cache:
return label_cache[name]
labels = service.users().labels().list(userId='me').execute().get('labels', [])
for lbl in labels:
if lbl['name'].lower() == name.lower():
label_cache[name] = lbl['id']
return lbl['id']
# Create nested labels parent-first
parts = name.split('/')
for i in range(1, len(parts) + 1):
partial = '/'.join(parts[:i])
if partial not in label_cache:
exists = next((l for l in labels if l['name'].lower() == partial.lower()), None)
if exists:
label_cache[partial] = exists['id']
else:
body = {
'name': partial,
'labelListVisibility': 'labelShow',
'messageListVisibility': 'show',
}
new = service.users().labels().create(userId='me', body=body).execute()
label_cache[partial] = new['id']
print(f" [+] Label creado: {partial}")
return label_cache[name]
def get_header(headers: list, name: str) -> str:
for h in headers:
if h['name'].lower() == name.lower():
return h['value']
return ''
def extract_unsubscribe_url(headers: list) -> str | None:
raw = get_header(headers, 'List-Unsubscribe')
if not raw:
return None
# Prefer HTTPS link over mailto
urls = re.findall(r'<(https?://[^>]+)>', raw)
return urls[0] if urls else None
def try_unsubscribe(url: str) -> bool:
# Guard: only follow HTTPS unsubscribe links to avoid HTTP downgrade / SSRF
if not url.startswith('https://'):
return False
try:
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
urllib.request.urlopen(req, timeout=10)
return True
except Exception:
return False
def is_bank_email(sender: str, subject: str) -> tuple[bool, bool]:
"""Returns (is_bank, is_statement)."""
combined = (sender + ' ' + subject).lower()
if not any(b in combined for b in BANK_DOMAINS):
return False, False
is_statement = any(k in combined for k in BANK_STATEMENT_KEYWORDS)
return True, is_statement
def is_spam(sender: str, subject: str) -> bool:
combined = (sender + ' ' + subject).lower()
return any(k in combined for k in SPAM_KEYWORDS)
def process_account(account_name: str):
print(f"\n{'='*60}")
print(f" Procesando: {account_name}")
print(f"{'='*60}")
service = authenticate(account_name)
label_cache = {}
# Ensure all labels exist
print("\n[1] Creando estructura de labels...")
for lbl in LABELS_TO_CREATE:
get_or_create_label(service, lbl, label_cache)
stats = {
'unsubscribed': 0,
'deleted_spam': 0,
'archived_bank_promo': 0,
'archived_bank_statement': 0,
'errors': 0,
}
# ── Pass 1: Promotions category ────────────────────────────────────────────
print("\n[2] Analizando correos promocionales...")
query = 'category:promotions OR category:updates'
page_token = None
while True:
try:
kwargs = {'userId': 'me', 'q': query, 'maxResults': 500}
if page_token:
kwargs['pageToken'] = page_token
result = service.users().messages().list(**kwargs).execute()
except HttpError as e:
print(f" Error listando mensajes: {e}")
break
messages = result.get('messages', [])
if not messages:
break
print(f" Encontrados {len(messages)} mensajes en esta página...")
for msg_ref in messages:
try:
msg = service.users().messages().get(
userId='me', id=msg_ref['id'],
format='metadata',
metadataHeaders=['From', 'Subject', 'List-Unsubscribe']
).execute()
headers = msg.get('payload', {}).get('headers', [])
sender = get_header(headers, 'From')
subject = get_header(headers, 'Subject')
labels = msg.get('labelIds', [])
is_bank, is_statement = is_bank_email(sender, subject)
if is_bank:
if is_statement:
lbl_id = get_or_create_label(service, 'Bancos/Extractos', label_cache)
action = 'archive_statement'
else:
lbl_id = get_or_create_label(service, 'Bancos/Promo', label_cache)
action = 'archive_bank_promo'
service.users().messages().modify(
userId='me', id=msg_ref['id'],
body={'addLabelIds': [lbl_id], 'removeLabelIds': ['INBOX']}
).execute()
if action == 'archive_statement':
stats['archived_bank_statement'] += 1
else:
stats['archived_bank_promo'] += 1
else:
# Non-bank promo → unsubscribe + delete
unsub_url = extract_unsubscribe_url(headers)
if unsub_url:
ok = try_unsubscribe(unsub_url)
if ok:
stats['unsubscribed'] += 1
print(f" [OK] Unsubscribe: {sender[:60]}")
service.users().messages().trash(userId='me', id=msg_ref['id']).execute()
stats['deleted_spam'] += 1
except HttpError as e:
stats['errors'] += 1
if e.resp.status == 429:
print(" Rate limit — esperando 5s...")
time.sleep(5)
page_token = result.get('nextPageToken')
if not page_token:
break
# ── Pass 2: Explicit spam keywords in inbox ────────────────────────────────
print("\n[3] Buscando spam por keywords en inbox...")
spam_query = ' OR '.join(f'subject:"{k}"' for k in SPAM_KEYWORDS[:8]) # Gmail query limit
try:
result = service.users().messages().list(
userId='me', q=f'in:inbox ({spam_query})', maxResults=500
).execute()
messages = result.get('messages', [])
print(f" Encontrados {len(messages)} mensajes spam por keyword...")
for msg_ref in messages:
try:
msg = service.users().messages().get(
userId='me', id=msg_ref['id'],
format='metadata',
metadataHeaders=['From', 'Subject', 'List-Unsubscribe']
).execute()
headers = msg.get('payload', {}).get('headers', [])
sender = get_header(headers, 'From')
subject = get_header(headers, 'Subject')
unsub_url = extract_unsubscribe_url(headers)
if unsub_url:
ok = try_unsubscribe(unsub_url)
if ok:
stats['unsubscribed'] += 1
service.users().messages().trash(userId='me', id=msg_ref['id']).execute()
stats['deleted_spam'] += 1
except HttpError:
stats['errors'] += 1
except HttpError as e:
print(f" Error en búsqueda spam: {e}")
# ── Summary ────────────────────────────────────────────────────────────────
print(f"\n{'─'*40}")
print(f" RESUMEN — {account_name}")
print(f" Unsubscribes realizados : {stats['unsubscribed']}")
print(f" Correos eliminados : {stats['deleted_spam']}")
print(f" Bancos/Promo : {stats['archived_bank_promo']}")
print(f" Bancos/Extractos : {stats['archived_bank_statement']}")
print(f" Errores : {stats['errors']}")
print(f"{'─'*40}")
return stats
def main():
accounts = ['alro65', 'alro65usa'] # token files: token_alro65.pickle, token_alro65usa.pickle
print("=" * 40)
print(" EmailManager -- Limpieza Gmail")
print("=" * 40)
print("\nSe procesaran:")
print(" - alro65@gmail.com")
print(" - alro65usa@gmail.com")
print("\nPara cada cuenta se abrirá el navegador para autenticación OAuth.")
print("Usa ventana incógnito si es necesario.\n")
input("Presiona ENTER para comenzar...")
total_deleted = 0
total_unsub = 0
for acc in accounts:
stats = process_account(acc)
total_deleted += stats['deleted_spam']
total_unsub += stats['unsubscribed']
print(f"\n{'='*40}")
print(f" TOTAL GENERAL")
print(f" Unsubscribes : {total_unsub}")
print(f" Eliminados : {total_deleted}")
print(f"{'='*40}")
print("\nListo. Vacía la papelera en Gmail para liberar espacio.")
if __name__ == '__main__':
main()