hermes-proton/skills/proton_mail/tools.py
B.A. Baracus f8b9991207
feat(proton-mail): Hermes skill — IMAP/SMTP tools via Bridge
Full Proton Mail Bridge Hermes skill with 6 tools:
- proton_mail_bridge_status — check daemon health
- proton_mail_list — list inbox/folder messages
- proton_mail_read — read full message by UID (body+headers)
- proton_mail_search — search by subject/from/body/all
- proton_mail_send — send email with CC/BCC support
- proton_mail_reply — reply preserving In-Reply-To/References

Implementation: pure Python stdlib (imaplib + smtplib + email),
no external dependencies. 22 unit tests with mocked IMAP/SMTP.

Follows architecture from ARCHITECTURE.md (section 3).
Per-tool auth via PROTONMAIL_ACCOUNT + PROTONMAIL_BRIDGE_PASSWORD env vars.
Bridge runs on 127.0.0.1:1143 (IMAP TLS) / 127.0.0.1:1025 (SMTP STARTTLS).
2026-06-08 18:31:07 +02:00

726 lines
24 KiB
Python

"""Proton Mail Bridge Hermes skill — IMAP/SMTP tool handlers.
Requires Proton Mail Bridge running locally:
IMAP: 127.0.0.1:1143 (TLS)
SMTP: 127.0.0.1:1025 (STARTTLS)
Credentials from env:
PROTONMAIL_ACCOUNT — email address (e.g. user@proton.me)
PROTONMAIL_BRIDGE_PASSWORD — Bridge-generated app password
PROTONMAIL_IMAP_HOST — optional, default 127.0.0.1
PROTONMAIL_IMAP_PORT — optional, default 1143
PROTONMAIL_SMTP_HOST — optional, default 127.0.0.1
PROTONMAIL_SMTP_PORT — optional, default 1025
"""
import email
import imaplib
import json
import os
import smtplib
import socket
import time
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.header import decode_header
from email.utils import formatdate, parsedate_to_datetime
from typing import Optional
# ── Constants ──────────────────────────────────────────────────────────
DEFAULT_IMAP_HOST = "127.0.0.1"
DEFAULT_IMAP_PORT = 1143
DEFAULT_SMTP_HOST = "127.0.0.1"
DEFAULT_SMTP_PORT = 1025
FETCH_TIMEOUT = 30
LIST_MAX_DEFAULT = 20
# ── Credential helpers ─────────────────────────────────────────────────
def _get_config() -> dict:
"""Read Bridge connection config from environment."""
account = os.environ.get("PROTONMAIL_ACCOUNT")
password = os.environ.get("PROTONMAIL_BRIDGE_PASSWORD")
return {
"account": account,
"password": password,
"imap_host": os.environ.get("PROTONMAIL_IMAP_HOST", DEFAULT_IMAP_HOST),
"imap_port": int(os.environ.get("PROTONMAIL_IMAP_PORT", DEFAULT_IMAP_PORT)),
"smtp_host": os.environ.get("PROTONMAIL_SMTP_HOST", DEFAULT_SMTP_HOST),
"smtp_port": int(os.environ.get("PROTONMAIL_SMTP_PORT", DEFAULT_SMTP_PORT)),
}
def _check_credentials(cfg: dict) -> Optional[str]:
"""Return error string if credentials are missing, else None."""
if not cfg["account"]:
return "PROTONMAIL_ACCOUNT environment variable is not set"
if not cfg["password"]:
return "PROTONMAIL_BRIDGE_PASSWORD environment variable is not set"
return None
def _sanitize_search_term(term: str) -> str:
"""Sanitize an IMAP search term — strip control characters and quotes.
Prevents IMAP injection. Only allows printable ASCII + common unicode
letters used in subjects.
"""
# Remove characters that could break the IMAP SEARCH command
sanitized = "".join(c for c in term if c.isprintable() and c not in '()"*%\\')
return sanitized.strip()
# ── IMAP connection ────────────────────────────────────────────────────
def _open_imap(cfg: dict) -> tuple[imaplib.IMAP4_SSL, Optional[str]]:
"""Connect and authenticate to Bridge IMAP.
Returns (connection, None) on success, (None, error_string) on failure.
"""
try:
conn = imaplib.IMAP4_SSL(cfg["imap_host"], cfg["imap_port"], timeout=FETCH_TIMEOUT)
status, data = conn.login(cfg["account"], cfg["password"])
if status != "OK":
msg = data[0].decode("utf-8", errors="replace") if data else "Login failed"
conn.logout()
return None, msg
return conn, None
except (ConnectionRefusedError, socket.timeout, OSError) as e:
return None, f"Cannot connect to Bridge IMAP at {cfg['imap_host']}:{cfg['imap_port']}: {e}"
except imaplib.IMAP4.error as e:
return None, f"IMAP error: {e}"
def _decode_mime_header(value: str | bytes | None) -> str:
"""Decode a MIME-encoded header value to plain text."""
if value is None:
return ""
if isinstance(value, bytes):
value = value.decode("utf-8", errors="replace")
decoded_parts = decode_header(value)
parts = []
for part, charset in decoded_parts:
if isinstance(part, bytes):
try:
parts.append(part.decode(charset or "utf-8", errors="replace"))
except LookupError:
parts.append(part.decode("utf-8", errors="replace"))
else:
parts.append(str(part))
return " ".join(parts)
def _get_body_text(msg: email.message.Message) -> str:
"""Extract plain text body from an email message.
Prefers text/plain; falls back to text/html with note.
"""
if msg.is_multipart():
# First try text/plain
for part in msg.walk():
ctype = part.get_content_type()
if ctype == "text/plain":
payload = part.get_payload(decode=True)
if payload:
return payload.decode("utf-8", errors="replace")
# Fall back to text/html
for part in msg.walk():
ctype = part.get_content_type()
if ctype == "text/html":
payload = part.get_payload(decode=True)
if payload:
return payload.decode("utf-8", errors="replace")
return ""
else:
payload = msg.get_payload(decode=True)
if payload:
return payload.decode("utf-8", errors="replace")
return ""
def _parse_fetch_response(msg_data: list) -> dict | None:
"""Parse a single IMAP FETCH response into a structured dict.
Returns dict with uid, subject, from_, to, date, body, message_id, flags,
or None if the data can't be parsed.
"""
if not msg_data or msg_data[0] is None:
return None
raw = msg_data[0]
if isinstance(raw, tuple) and len(raw) == 2:
uid_flags_bytes, rfc822_bytes = raw
else:
return None
# Parse the RFC822 message
try:
msg = email.message_from_bytes(rfc822_bytes)
except Exception:
return None
# Extract UID from the wrapper — look for "UID <number>" keyword
uid_str = uid_flags_bytes.decode("utf-8", errors="replace")
uid = None
parts = uid_str.split()
for i, part in enumerate(parts):
keyword = part.lstrip("(").upper()
if keyword == "UID" and i + 1 < len(parts):
candidate = parts[i + 1].rstrip(")")
if candidate.isdigit():
uid = int(candidate)
break
subject = _decode_mime_header(msg.get("Subject", ""))
sender = _decode_mime_header(msg.get("From", ""))
recipient = _decode_mime_header(msg.get("To", ""))
date_raw = msg.get("Date", "")
message_id = msg.get("Message-ID", "")
# Parse date
date_iso = ""
if date_raw:
try:
dt = parsedate_to_datetime(date_raw)
date_iso = dt.isoformat()
except (ValueError, TypeError):
date_iso = date_raw
body = _get_body_text(msg)
return {
"uid": uid,
"subject": subject,
"from": sender,
"to": recipient,
"date": date_iso,
"body": body,
"message_id": message_id,
"flags": [],
}
# ── SMTP connection ────────────────────────────────────────────────────
def _open_smtp(cfg: dict) -> tuple[smtplib.SMTP | None, str | None]:
"""Connect and authenticate to Bridge SMTP.
Returns (connection, None) on success, (None, error_string) on failure.
"""
try:
conn = smtplib.SMTP(cfg["smtp_host"], cfg["smtp_port"], timeout=FETCH_TIMEOUT)
conn.starttls()
conn.login(cfg["account"], cfg["password"])
return conn, None
except (ConnectionRefusedError, socket.timeout, OSError) as e:
return None, f"Cannot connect to Bridge SMTP: {e}"
except smtplib.SMTPException as e:
return None, f"SMTP error: {e}"
# ── Port check ─────────────────────────────────────────────────────────
def _check_port_open(host: str, port: int, timeout: float = 3.0) -> bool:
"""Check if a TCP port is open (Bridge running check)."""
try:
s = socket.create_connection((host, port), timeout=timeout)
s.close()
return True
except (ConnectionRefusedError, socket.timeout, OSError):
return False
# ── Tool Handlers ──────────────────────────────────────────────────────
def proton_mail_bridge_status(args: dict) -> str:
"""Check Proton Bridge status — running, authenticated, and reachable.
Tool schema:
name: proton_mail_bridge_status
description: Check if Proton Mail Bridge daemon is running and reachable
parameters: {}
"""
cfg = _get_config()
cred_err = _check_credentials(cfg)
if cred_err:
return json.dumps({"status": "unconfigured", "error": cred_err})
imap_up = _check_port_open(cfg["imap_host"], cfg["imap_port"])
smtp_up = _check_port_open(cfg["smtp_host"], cfg["smtp_port"])
if imap_up and smtp_up:
# Try a real IMAP login to confirm auth works
conn, err = _open_imap(cfg)
if conn:
conn.logout()
return json.dumps({
"status": "running",
"imap": f"{cfg['imap_host']}:{cfg['imap_port']}",
"smtp": f"{cfg['smtp_host']}:{cfg['smtp_port']}",
"authenticated": True,
})
else:
return json.dumps({
"status": "running",
"imap": f"{cfg['imap_host']}:{cfg['imap_port']}",
"smtp": f"{cfg['smtp_host']}:{cfg['smtp_port']}",
"authenticated": False,
"auth_error": err,
})
ports = []
if not imap_up:
ports.append(f"IMAP {cfg['imap_host']}:{cfg['imap_port']}")
if not smtp_up:
ports.append(f"SMTP {cfg['smtp_host']}:{cfg['smtp_port']}")
return json.dumps({
"status": "stopped",
"unreachable": ports,
})
def proton_mail_list(args: dict) -> str:
"""List messages in a mailbox folder.
Tool schema:
name: proton_mail_list
description: List recent email messages in a folder (default INBOX)
parameters:
type: object
properties:
folder:
type: string
description: Mailbox folder (INBOX, Sent, Drafts, etc.)
default: INBOX
limit:
type: integer
description: Max messages to return (1-100)
default: 20
"""
folder = args.get("folder", "INBOX")
limit = min(int(args.get("limit", LIST_MAX_DEFAULT)), 100)
limit = max(limit, 1)
cfg = _get_config()
cred_err = _check_credentials(cfg)
if cred_err:
return json.dumps({"success": False, "error": cred_err})
conn, err = _open_imap(cfg)
if err:
return json.dumps({"success": False, "error": err})
try:
status, data = conn.select(f'"{folder}"', readonly=True)
if status != "OK":
return json.dumps({
"success": False, "error": f"Cannot select folder '{folder}'"
})
# Search all messages, newest first
status, data = conn.search(None, "ALL")
if status != "OK":
return json.dumps({
"success": False, "error": "IMAP search failed"
})
uids = data[0].split() if data[0] else []
if not uids:
return json.dumps({
"success": True, "messages": [], "folder": folder
})
# Take the N most recent (last N in the list)
recent_uids = uids[-limit:]
messages = []
for uid_bytes in recent_uids:
uid = uid_bytes.decode("ascii")
status, msg_data = conn.fetch(uid, "(UID FLAGS INTERNALDATE BODY.PEEK[HEADER.FIELDS (SUBJECT FROM TO DATE MESSAGE-ID)])")
if status != "OK" or not msg_data or msg_data[0] is None:
continue
raw = msg_data[0]
if isinstance(raw, tuple) and len(raw) == 2:
header_bytes = raw[1]
msg = email.message_from_bytes(header_bytes)
messages.append({
"uid": int(uid),
"subject": _decode_mime_header(msg.get("Subject", "(no subject)")),
"from": _decode_mime_header(msg.get("From", "")),
"to": _decode_mime_header(msg.get("To", "")),
"date": msg.get("Date", ""),
})
return json.dumps({
"success": True,
"messages": messages,
"folder": folder,
"total": len(uids),
})
except imaplib.IMAP4.error as e:
return json.dumps({"success": False, "error": f"IMAP error: {e}"})
finally:
try:
conn.logout()
except Exception:
pass
def proton_mail_read(args: dict) -> str:
"""Read a full email message by UID.
Tool schema:
name: proton_mail_read
description: Read a full email message including body content
parameters:
type: object
properties:
uid:
type: integer
description: UID of the message to read
folder:
type: string
description: Mailbox folder (default INBOX)
default: INBOX
"""
uid = args.get("uid")
if uid is None:
return json.dumps({"success": False, "error": "Missing required parameter: uid"})
folder = args.get("folder", "INBOX")
cfg = _get_config()
cred_err = _check_credentials(cfg)
if cred_err:
return json.dumps({"success": False, "error": cred_err})
conn, err = _open_imap(cfg)
if err:
return json.dumps({"success": False, "error": err})
try:
conn.select(f'"{folder}"', readonly=True)
status, msg_data = conn.uid("FETCH", str(uid), "(UID FLAGS RFC822)")
if status != "OK" or not msg_data or msg_data[0] is None:
return json.dumps({
"success": False, "error": f"Message UID {uid} not found in {folder}"
})
parsed = _parse_fetch_response(msg_data)
if parsed is None:
return json.dumps({"success": False, "error": "Failed to parse message"})
parsed["success"] = True
return json.dumps(parsed)
except imaplib.IMAP4.error as e:
return json.dumps({"success": False, "error": f"IMAP error: {e}"})
finally:
try:
conn.logout()
except Exception:
pass
def proton_mail_search(args: dict) -> str:
"""Search messages across a mailbox folder.
Tool schema:
name: proton_mail_search
description: Search email messages by query in a specific field
parameters:
type: object
properties:
query:
type: string
description: Search query text (min 2 characters)
field:
type: string
description: Field to search (subject, from, body, or all)
enum: [subject, from, body, all]
default: all
folder:
type: string
description: Mailbox folder (default INBOX)
default: INBOX
limit:
type: integer
description: Max results (1-100)
default: 20
"""
query = args.get("query", "")
if not query or len(query.strip()) < 2:
return json.dumps({
"success": False,
"error": "Search query must be at least 2 characters",
})
field = args.get("field", "all")
folder = args.get("folder", "INBOX")
limit = min(int(args.get("limit", LIST_MAX_DEFAULT)), 100)
limit = max(limit, 1)
query = _sanitize_search_term(query)
cfg = _get_config()
cred_err = _check_credentials(cfg)
if cred_err:
return json.dumps({"success": False, "error": cred_err})
conn, err = _open_imap(cfg)
if err:
return json.dumps({"success": False, "error": err})
try:
conn.select(f'"{folder}"', readonly=True)
# Build IMAP SEARCH criteria
search_criteria = []
if field == "subject":
search_criteria = ["SUBJECT", query]
elif field == "from":
search_criteria = ["FROM", query]
elif field == "body":
search_criteria = ["BODY", query]
else:
# Search across all fields
search_criteria = ["OR", "OR", "SUBJECT", query, "FROM", query, "BODY", query]
status, data = conn.search(None, *search_criteria)
if status != "OK":
return json.dumps({"success": False, "error": "IMAP search failed"})
uid_list = data[0].split() if data[0] else []
if not uid_list:
return json.dumps({"success": True, "messages": [], "folder": folder})
recent = uid_list[-limit:]
messages = []
for uid_bytes in recent:
uid_str = uid_bytes.decode("ascii")
status, msg_data = conn.fetch(uid_str, "(UID FLAGS INTERNALDATE BODY.PEEK[HEADER.FIELDS (SUBJECT FROM TO DATE MESSAGE-ID)])")
if status != "OK" or not msg_data or msg_data[0] is None:
continue
raw = msg_data[0]
if isinstance(raw, tuple) and len(raw) == 2:
header_bytes = raw[1]
msg = email.message_from_bytes(header_bytes)
messages.append({
"uid": int(uid_str),
"subject": _decode_mime_header(msg.get("Subject", "(no subject)")),
"from": _decode_mime_header(msg.get("From", "")),
"to": _decode_mime_header(msg.get("To", "")),
"date": msg.get("Date", ""),
})
return json.dumps({
"success": True,
"messages": messages,
"folder": folder,
"query": query,
})
except imaplib.IMAP4.error as e:
return json.dumps({"success": False, "error": f"IMAP error: {e}"})
finally:
try:
conn.logout()
except Exception:
pass
def proton_mail_send(args: dict) -> str:
"""Send a new email via Proton Bridge SMTP.
Tool schema:
name: proton_mail_send
description: Send a new email message
parameters:
type: object
properties:
to:
type: string
description: Recipient email address(es), comma-separated
cc:
type: string
description: CC recipient(s), comma-separated
bcc:
type: string
description: BCC recipient(s), comma-separated
subject:
type: string
description: Email subject line
body:
type: string
description: Email body text (plain text)
"""
to = args.get("to", "")
cc = args.get("cc", "")
bcc = args.get("bcc", "")
subject = args.get("subject", "").strip()
body = args.get("body", "")
if not to:
return json.dumps({"success": False, "error": "Missing required parameter: to"})
if not subject:
return json.dumps({"success": False, "error": "Missing required parameter: subject"})
cfg = _get_config()
cred_err = _check_credentials(cfg)
if cred_err:
return json.dumps({"success": False, "error": cred_err})
conn, err = _open_smtp(cfg)
if err:
return json.dumps({"success": False, "error": err})
try:
msg = MIMEText(body, "plain", "utf-8")
msg["Subject"] = subject
msg["From"] = cfg["account"]
msg["To"] = to
msg["Date"] = formatdate(localtime=True)
msg["X-Mailer"] = "hermes-proton-mail-skill"
if cc:
msg["Cc"] = cc
recipients = [r.strip() for r in to.split(",") if r.strip()]
if cc:
recipients.extend(r.strip() for r in cc.split(",") if r.strip())
if bcc:
recipients.extend(r.strip() for r in bcc.split(",") if r.strip())
bcc_list = [r.strip() for r in bcc.split(",") if r.strip()]
else:
bcc_list = []
# Send — SMTP lib handles BCC by not adding to the envelope
errors, message_id = conn.send_message(msg, from_addr=cfg["account"],
to_addrs=recipients)
return json.dumps({
"success": True,
"message_id": message_id,
"to": to,
"subject": subject,
})
except smtplib.SMTPException as e:
return json.dumps({"success": False, "error": f"SMTP error: {e}"})
finally:
try:
conn.quit()
except Exception:
pass
def proton_mail_reply(args: dict) -> str:
"""Reply to an existing email by UID.
Tool schema:
name: proton_mail_reply
description: Reply to an existing email, preserving thread headers
parameters:
type: object
properties:
uid:
type: integer
description: UID of the message to reply to
body:
type: string
description: Reply body text
folder:
type: string
description: Mailbox folder (default INBOX)
default: INBOX
"""
uid = args.get("uid")
body = args.get("body", "")
if uid is None:
return json.dumps({"success": False, "error": "Missing required parameter: uid"})
if not body:
return json.dumps({"success": False, "error": "Missing required parameter: body"})
folder = args.get("folder", "INBOX")
cfg = _get_config()
cred_err = _check_credentials(cfg)
if cred_err:
return json.dumps({"success": False, "error": cred_err})
# Read original message to get thread context
conn, err = _open_imap(cfg)
if err:
return json.dumps({"success": False, "error": err})
try:
conn.select(f'"{folder}"', readonly=True)
status, msg_data = conn.uid("FETCH", str(uid), "(UID FLAGS RFC822)")
if status != "OK" or not msg_data or msg_data[0] is None:
return json.dumps({
"success": False, "error": f"Message UID {uid} not found"
})
parsed = _parse_fetch_response(msg_data)
if parsed is None:
return json.dumps({"success": False, "error": "Failed to parse original message"})
original_msg_id = parsed.get("message_id", "")
original_subject = parsed.get("subject", "")
original_from = parsed.get("from", "")
original_date = parsed.get("date", "")
conn.logout()
except imaplib.IMAP4.error as e:
try:
conn.logout()
except Exception:
pass
return json.dumps({"success": False, "error": f"IMAP error reading original: {e}"})
# Build thread headers
subject = original_subject
if not subject.lower().startswith("re:"):
subject = f"Re: {subject}"
smtp_conn, smtp_err = _open_smtp(cfg)
if smtp_err:
return json.dumps({"success": False, "error": smtp_err})
try:
msg = MIMEText(body, "plain", "utf-8")
msg["Subject"] = subject
msg["From"] = cfg["account"]
msg["To"] = original_from
msg["Date"] = formatdate(localtime=True)
msg["In-Reply-To"] = original_msg_id
msg["References"] = original_msg_id
msg["X-Mailer"] = "hermes-proton-mail-skill"
errors, message_id = smtp_conn.send_message(
msg, from_addr=cfg["account"], to_addrs=[original_from])
return json.dumps({
"success": True,
"message_id": message_id,
"in_reply_to": original_msg_id,
"to": original_from,
"subject": subject,
})
except smtplib.SMTPException as e:
return json.dumps({"success": False, "error": f"SMTP error: {e}"})
finally:
try:
smtp_conn.quit()
except Exception:
pass