343 lines
12 KiB
Python
343 lines
12 KiB
Python
"""
|
|
EmailManager — Gmail cleanup & organization
|
|
Accounts: alro65@gmail.com, alro65usa@gmail.com
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import base64
|
|
import re
|
|
import json
|
|
import pickle
|
|
import time
|
|
import urllib.request
|
|
|
|
sys.stdout.reconfigure(encoding='utf-8')
|
|
from datetime import datetime
|
|
from google.auth.transport.requests import Request
|
|
from google.oauth2.credentials import Credentials
|
|
from google_auth_oauthlib.flow import InstalledAppFlow
|
|
from googleapiclient.discovery import build
|
|
from googleapiclient.errors import HttpError
|
|
|
|
SCOPES = [
|
|
'https://www.googleapis.com/auth/gmail.modify',
|
|
'https://www.googleapis.com/auth/gmail.labels',
|
|
]
|
|
|
|
CREDENTIALS_FILE = os.path.join(os.path.dirname(__file__), 'credentials.json')
|
|
|
|
# ── Label structure ────────────────────────────────────────────────────────────
|
|
LABELS_TO_CREATE = [
|
|
"Bancos/Extractos",
|
|
"Bancos/Promo",
|
|
"Trabajo",
|
|
"AutoBooking",
|
|
"Recibos",
|
|
"Newsletters",
|
|
]
|
|
|
|
# ── Bank senders (promo goes to Bancos/Promo, statements to Bancos/Extractos) ─
|
|
BANK_DOMAINS = [
|
|
'bancolombia', 'davivienda', 'bbva', 'scotiabank', 'citibank',
|
|
'hsbc', 'santander', 'chase', 'wellsfargo', 'bankofamerica',
|
|
'capitalone', 'discover', 'americanexpress', 'amex', 'nequi',
|
|
'daviplata', 'bold', 'bancodeoccidente', 'bancopopular',
|
|
'coopcentral', 'ing.', 'paypal', 'stripe',
|
|
]
|
|
|
|
BANK_STATEMENT_KEYWORDS = [
|
|
'estado de cuenta', 'extracto', 'resumen de cuenta', 'account statement',
|
|
'transaction alert', 'alerta de transacción', 'compra realizada',
|
|
'pago recibido', 'transferencia', 'your statement',
|
|
]
|
|
|
|
# ── Spam/promo keywords in subject or sender ──────────────────────────────────
|
|
SPAM_KEYWORDS = [
|
|
'insurance quote', 'cotización de seguro', 'auto insurance',
|
|
'car insurance', 'life insurance', 'health insurance',
|
|
'get a quote', 'free quote', 'save on insurance',
|
|
'compare rates', 'lowest rates', 'best rates',
|
|
'final notice', 'last chance', 'act now', 'limited time',
|
|
'you\'ve been selected', 'congratulations you won',
|
|
'unclaimed reward', 'claim your prize',
|
|
]
|
|
|
|
PROMO_CATEGORIES = ['CATEGORY_PROMOTIONS', 'CATEGORY_UPDATES']
|
|
|
|
|
|
def authenticate(account_name: str) -> object:
|
|
token_file = os.path.join(os.path.dirname(__file__), f'token_{account_name}.pickle')
|
|
creds = None
|
|
|
|
if os.path.exists(token_file):
|
|
with open(token_file, 'rb') as f:
|
|
creds = pickle.load(f)
|
|
|
|
if not creds or not creds.valid:
|
|
if creds and creds.expired and creds.refresh_token:
|
|
creds.refresh(Request())
|
|
else:
|
|
flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_FILE, SCOPES)
|
|
creds = flow.run_local_server(port=0)
|
|
with open(token_file, 'wb') as f:
|
|
pickle.dump(creds, f)
|
|
|
|
return build('gmail', 'v1', credentials=creds)
|
|
|
|
|
|
def get_or_create_label(service, name: str, label_cache: dict) -> str:
|
|
if name in label_cache:
|
|
return label_cache[name]
|
|
|
|
labels = service.users().labels().list(userId='me').execute().get('labels', [])
|
|
for lbl in labels:
|
|
if lbl['name'].lower() == name.lower():
|
|
label_cache[name] = lbl['id']
|
|
return lbl['id']
|
|
|
|
# Create nested labels parent-first
|
|
parts = name.split('/')
|
|
for i in range(1, len(parts) + 1):
|
|
partial = '/'.join(parts[:i])
|
|
if partial not in label_cache:
|
|
exists = next((l for l in labels if l['name'].lower() == partial.lower()), None)
|
|
if exists:
|
|
label_cache[partial] = exists['id']
|
|
else:
|
|
body = {
|
|
'name': partial,
|
|
'labelListVisibility': 'labelShow',
|
|
'messageListVisibility': 'show',
|
|
}
|
|
new = service.users().labels().create(userId='me', body=body).execute()
|
|
label_cache[partial] = new['id']
|
|
print(f" [+] Label creado: {partial}")
|
|
|
|
return label_cache[name]
|
|
|
|
|
|
def get_header(headers: list, name: str) -> str:
|
|
for h in headers:
|
|
if h['name'].lower() == name.lower():
|
|
return h['value']
|
|
return ''
|
|
|
|
|
|
def extract_unsubscribe_url(headers: list) -> str | None:
|
|
raw = get_header(headers, 'List-Unsubscribe')
|
|
if not raw:
|
|
return None
|
|
# Prefer HTTPS link over mailto
|
|
urls = re.findall(r'<(https?://[^>]+)>', raw)
|
|
return urls[0] if urls else None
|
|
|
|
|
|
def try_unsubscribe(url: str) -> bool:
|
|
# Guard: only follow HTTPS unsubscribe links to avoid HTTP downgrade / SSRF
|
|
if not url.startswith('https://'):
|
|
return False
|
|
try:
|
|
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
|
|
urllib.request.urlopen(req, timeout=10)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def is_bank_email(sender: str, subject: str) -> tuple[bool, bool]:
|
|
"""Returns (is_bank, is_statement)."""
|
|
combined = (sender + ' ' + subject).lower()
|
|
if not any(b in combined for b in BANK_DOMAINS):
|
|
return False, False
|
|
is_statement = any(k in combined for k in BANK_STATEMENT_KEYWORDS)
|
|
return True, is_statement
|
|
|
|
|
|
def is_spam(sender: str, subject: str) -> bool:
|
|
combined = (sender + ' ' + subject).lower()
|
|
return any(k in combined for k in SPAM_KEYWORDS)
|
|
|
|
|
|
def process_account(account_name: str):
|
|
print(f"\n{'='*60}")
|
|
print(f" Procesando: {account_name}")
|
|
print(f"{'='*60}")
|
|
|
|
service = authenticate(account_name)
|
|
label_cache = {}
|
|
|
|
# Ensure all labels exist
|
|
print("\n[1] Creando estructura de labels...")
|
|
for lbl in LABELS_TO_CREATE:
|
|
get_or_create_label(service, lbl, label_cache)
|
|
|
|
stats = {
|
|
'unsubscribed': 0,
|
|
'deleted_spam': 0,
|
|
'archived_bank_promo': 0,
|
|
'archived_bank_statement': 0,
|
|
'errors': 0,
|
|
}
|
|
|
|
# ── Pass 1: Promotions category ────────────────────────────────────────────
|
|
print("\n[2] Analizando correos promocionales...")
|
|
query = 'category:promotions OR category:updates'
|
|
page_token = None
|
|
|
|
while True:
|
|
try:
|
|
kwargs = {'userId': 'me', 'q': query, 'maxResults': 500}
|
|
if page_token:
|
|
kwargs['pageToken'] = page_token
|
|
result = service.users().messages().list(**kwargs).execute()
|
|
except HttpError as e:
|
|
print(f" Error listando mensajes: {e}")
|
|
break
|
|
|
|
messages = result.get('messages', [])
|
|
if not messages:
|
|
break
|
|
|
|
print(f" Encontrados {len(messages)} mensajes en esta página...")
|
|
|
|
for msg_ref in messages:
|
|
try:
|
|
msg = service.users().messages().get(
|
|
userId='me', id=msg_ref['id'],
|
|
format='metadata',
|
|
metadataHeaders=['From', 'Subject', 'List-Unsubscribe']
|
|
).execute()
|
|
|
|
headers = msg.get('payload', {}).get('headers', [])
|
|
sender = get_header(headers, 'From')
|
|
subject = get_header(headers, 'Subject')
|
|
labels = msg.get('labelIds', [])
|
|
|
|
is_bank, is_statement = is_bank_email(sender, subject)
|
|
|
|
if is_bank:
|
|
if is_statement:
|
|
lbl_id = get_or_create_label(service, 'Bancos/Extractos', label_cache)
|
|
action = 'archive_statement'
|
|
else:
|
|
lbl_id = get_or_create_label(service, 'Bancos/Promo', label_cache)
|
|
action = 'archive_bank_promo'
|
|
|
|
service.users().messages().modify(
|
|
userId='me', id=msg_ref['id'],
|
|
body={'addLabelIds': [lbl_id], 'removeLabelIds': ['INBOX']}
|
|
).execute()
|
|
|
|
if action == 'archive_statement':
|
|
stats['archived_bank_statement'] += 1
|
|
else:
|
|
stats['archived_bank_promo'] += 1
|
|
|
|
else:
|
|
# Non-bank promo → unsubscribe + delete
|
|
unsub_url = extract_unsubscribe_url(headers)
|
|
if unsub_url:
|
|
ok = try_unsubscribe(unsub_url)
|
|
if ok:
|
|
stats['unsubscribed'] += 1
|
|
print(f" [OK] Unsubscribe: {sender[:60]}")
|
|
|
|
service.users().messages().trash(userId='me', id=msg_ref['id']).execute()
|
|
stats['deleted_spam'] += 1
|
|
|
|
except HttpError as e:
|
|
stats['errors'] += 1
|
|
if e.resp.status == 429:
|
|
print(" Rate limit — esperando 5s...")
|
|
time.sleep(5)
|
|
|
|
page_token = result.get('nextPageToken')
|
|
if not page_token:
|
|
break
|
|
|
|
# ── Pass 2: Explicit spam keywords in inbox ────────────────────────────────
|
|
print("\n[3] Buscando spam por keywords en inbox...")
|
|
spam_query = ' OR '.join(f'subject:"{k}"' for k in SPAM_KEYWORDS[:8]) # Gmail query limit
|
|
|
|
try:
|
|
result = service.users().messages().list(
|
|
userId='me', q=f'in:inbox ({spam_query})', maxResults=500
|
|
).execute()
|
|
|
|
messages = result.get('messages', [])
|
|
print(f" Encontrados {len(messages)} mensajes spam por keyword...")
|
|
|
|
for msg_ref in messages:
|
|
try:
|
|
msg = service.users().messages().get(
|
|
userId='me', id=msg_ref['id'],
|
|
format='metadata',
|
|
metadataHeaders=['From', 'Subject', 'List-Unsubscribe']
|
|
).execute()
|
|
|
|
headers = msg.get('payload', {}).get('headers', [])
|
|
sender = get_header(headers, 'From')
|
|
subject = get_header(headers, 'Subject')
|
|
|
|
unsub_url = extract_unsubscribe_url(headers)
|
|
if unsub_url:
|
|
ok = try_unsubscribe(unsub_url)
|
|
if ok:
|
|
stats['unsubscribed'] += 1
|
|
|
|
service.users().messages().trash(userId='me', id=msg_ref['id']).execute()
|
|
stats['deleted_spam'] += 1
|
|
|
|
except HttpError:
|
|
stats['errors'] += 1
|
|
|
|
except HttpError as e:
|
|
print(f" Error en búsqueda spam: {e}")
|
|
|
|
# ── Summary ────────────────────────────────────────────────────────────────
|
|
print(f"\n{'─'*40}")
|
|
print(f" RESUMEN — {account_name}")
|
|
print(f" Unsubscribes realizados : {stats['unsubscribed']}")
|
|
print(f" Correos eliminados : {stats['deleted_spam']}")
|
|
print(f" Bancos/Promo : {stats['archived_bank_promo']}")
|
|
print(f" Bancos/Extractos : {stats['archived_bank_statement']}")
|
|
print(f" Errores : {stats['errors']}")
|
|
print(f"{'─'*40}")
|
|
|
|
return stats
|
|
|
|
|
|
def main():
|
|
accounts = ['alro65', 'alro65usa'] # token files: token_alro65.pickle, token_alro65usa.pickle
|
|
|
|
print("=" * 40)
|
|
print(" EmailManager -- Limpieza Gmail")
|
|
print("=" * 40)
|
|
print("\nSe procesaran:")
|
|
print(" - alro65@gmail.com")
|
|
print(" - alro65usa@gmail.com")
|
|
print("\nPara cada cuenta se abrirá el navegador para autenticación OAuth.")
|
|
print("Usa ventana incógnito si es necesario.\n")
|
|
|
|
input("Presiona ENTER para comenzar...")
|
|
|
|
total_deleted = 0
|
|
total_unsub = 0
|
|
|
|
for acc in accounts:
|
|
stats = process_account(acc)
|
|
total_deleted += stats['deleted_spam']
|
|
total_unsub += stats['unsubscribed']
|
|
|
|
print(f"\n{'='*40}")
|
|
print(f" TOTAL GENERAL")
|
|
print(f" Unsubscribes : {total_unsub}")
|
|
print(f" Eliminados : {total_deleted}")
|
|
print(f"{'='*40}")
|
|
print("\nListo. Vacía la papelera en Gmail para liberar espacio.")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|