From f8b9991207f05cf8eadd85b3dd0685048efd400b Mon Sep 17 00:00:00 2001 From: "B.A. Baracus" Date: Mon, 8 Jun 2026 18:31:07 +0200 Subject: [PATCH] =?UTF-8?q?feat(proton-mail):=20Hermes=20skill=20=E2=80=94?= =?UTF-8?q?=20IMAP/SMTP=20tools=20via=20Bridge?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Full Proton Mail Bridge Hermes skill with 6 tools: - proton_mail_bridge_status — check daemon health - proton_mail_list — list inbox/folder messages - proton_mail_read — read full message by UID (body+headers) - proton_mail_search — search by subject/from/body/all - proton_mail_send — send email with CC/BCC support - proton_mail_reply — reply preserving In-Reply-To/References Implementation: pure Python stdlib (imaplib + smtplib + email), no external dependencies. 22 unit tests with mocked IMAP/SMTP. Follows architecture from ARCHITECTURE.md (section 3). Per-tool auth via PROTONMAIL_ACCOUNT + PROTONMAIL_BRIDGE_PASSWORD env vars. Bridge runs on 127.0.0.1:1143 (IMAP TLS) / 127.0.0.1:1025 (SMTP STARTTLS). --- .gitignore | 6 + skills/proton-mail/SKILL.md | 341 ++++++++++++ skills/proton-mail/references/tools.py | 726 +++++++++++++++++++++++++ skills/proton_mail/__init__.py | 1 + skills/proton_mail/tools.py | 726 +++++++++++++++++++++++++ tests/test_mail.py | 330 +++++++++++ 6 files changed, 2130 insertions(+) create mode 100644 skills/proton-mail/SKILL.md create mode 100644 skills/proton-mail/references/tools.py create mode 100644 skills/proton_mail/__init__.py create mode 100644 skills/proton_mail/tools.py create mode 100644 tests/test_mail.py diff --git a/.gitignore b/.gitignore index f815159..66dbfa6 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,12 @@ __pycache__/ .env *.env.local +# Python cache +.pytest_cache/ + +# Debug / scratch +debug_*.py + # OS .DS_Store Thumbs.db diff --git a/skills/proton-mail/SKILL.md b/skills/proton-mail/SKILL.md new file mode 100644 index 0000000..069c556 --- /dev/null +++ b/skills/proton-mail/SKILL.md @@ -0,0 +1,341 @@ +--- +name: proton-mail +description: "Proton Mail via Bridge — read, send, search, and reply to emails using the local Proton Mail Bridge daemon (IMAP 127.0.0.1:1143 / SMTP 127.0.0.1:1025)." +version: 1.0.0 +author: Trentuna / B.A. Baracus +license: MIT +platforms: [linux, macos] +metadata: + hermes: + tags: [proton, email, imap, smtp, bridge, productivity] + category: productivity + related_skills: [hermes-agent] + tools: + - proton_mail_bridge_status + - proton_mail_list + - proton_mail_read + - proton_mail_search + - proton_mail_send + - proton_mail_reply + requires_env: + - PROTONMAIL_ACCOUNT + - PROTONMAIL_BRIDGE_PASSWORD + optional_env: + - PROTONMAIL_IMAP_HOST + - PROTONMAIL_IMAP_PORT + - PROTONMAIL_SMTP_HOST + - PROTONMAIL_SMTP_PORT +--- + +# Proton Mail Bridge — Hermes Skill + +Give any Hermes agent native access to Proton Mail via the official [Proton Mail Bridge](https://proton.me/mail/bridge). + +The Bridge runs as a local daemon, handles all OpenPGP encryption/decryption transparently, and exposes standard IMAP (read) and SMTP (send) ports on localhost. This skill wraps those ports as Hermes tools. + +## How It Works + +``` +┌──────────────┐ IMAP 127.0.0.1:1143 (TLS) ┌─────────────────┐ +│ Hermes │ ───────────────────────────────► │ Proton Bridge │ +│ Agent │ │ (local daemon) │ +│ (this │ ◄─────────────────────────────── │ │ +│ skill) │ SMTP 127.0.0.1:1025 (STARTTLS) │ decrypts PGP │ +└──────────────┘ └────────┬────────┘ + │ + ▼ + Proton Servers +``` + +## Prerequisites + +1. **Proton Mail Bridge** installed and running: + - Download: https://proton.me/mail/bridge + - Linux: `protonmail-bridge --cli` + - macOS: `brew install --cask proton-mail-bridge` +2. **Proton Mail account** (Free or paid) +3. **Bridge credentials** — Bridge generates a local app password (NOT your Proton password). Get it from Bridge → Settings → Account → Mailbox configuration → Show password. + +## Environment Variables + +| Variable | Required | Default | Description | +|----------|----------|---------|-------------| +| `PROTONMAIL_ACCOUNT` | Yes | — | Your Proton email address (e.g. `user@proton.me`) | +| `PROTONMAIL_BRIDGE_PASSWORD` | Yes | — | Bridge-generated app password | +| `PROTONMAIL_IMAP_HOST` | No | `127.0.0.1` | Bridge IMAP hostname | +| `PROTONMAIL_IMAP_PORT` | No | `1143` | Bridge IMAP port | +| `PROTONMAIL_SMTP_HOST` | No | `127.0.0.1` | Bridge SMTP hostname | +| `PROTONMAIL_SMTP_PORT` | No | `1025` | Bridge SMTP port | + +## Tool Reference + +### `proton_mail_bridge_status` + +Check that the Proton Bridge daemon is running, reachable, and authenticated. + +```json +{ + "name": "proton_mail_bridge_status", + "description": "Check Proton Mail Bridge status — running, authenticated, connected.", + "parameters": {} +} +``` + +**Returns:** +- `{"status": "running", "imap": "127.0.0.1:1143", "smtp": "127.0.0.1:1025", "authenticated": true}` +- `{"status": "stopped", "unreachable": ["IMAP 127.0.0.1:1143"]}` +- `{"status": "unconfigured", "error": "PROTONMAIL_ACCOUNT environment variable is not set"}` + +--- + +### `proton_mail_list` + +List recent messages in a mailbox folder. Returns headers only (no full body). + +```json +{ + "name": "proton_mail_list", + "description": "List recent email messages in a folder.", + "parameters": { + "type": "object", + "properties": { + "folder": {"type": "string", "description": "Mailbox folder (INBOX, Sent, Drafts, etc.)", "default": "INBOX"}, + "limit": {"type": "integer", "description": "Max messages to return (1-100)", "default": 20} + } + } +} +``` + +**Returns:** +```json +{ + "success": true, + "messages": [ + {"uid": 42, "subject": "Meeting tomorrow", "from": "alice@example.com", + "to": "you@proton.me", "date": "Tue, 4 Jun 2024 14:00:00 +0000"} + ], + "folder": "INBOX", + "total": 137 +} +``` + +--- + +### `proton_mail_read` + +Read a full email by UID — subject, all headers, and body text. + +```json +{ + "name": "proton_mail_read", + "description": "Read a full email message including body content.", + "parameters": { + "type": "object", + "properties": { + "uid": {"type": "integer", "description": "UID of the message to read"}, + "folder": {"type": "string", "description": "Mailbox folder", "default": "INBOX"} + } + } +} +``` + +**Returns:** +```json +{ + "success": true, + "uid": 42, + "subject": "Meeting tomorrow", + "from": "alice@example.com", + "to": "you@proton.me", + "date": "2024-06-04T14:00:00+00:00", + "body": "Hi, let's meet at 3pm tomorrow.\n\nBest,\nAlice", + "message_id": "", + "flags": [] +} +``` + +--- + +### `proton_mail_search` + +Search emails across a mailbox by subject, sender, body, or all fields. + +```json +{ + "name": "proton_mail_search", + "description": "Search email messages by query in a specific field.", + "parameters": { + "type": "object", + "properties": { + "query": {"type": "string", "description": "Search query text (min 2 characters)"}, + "field": {"type": "string", "description": "Field to search (subject, from, body, or all)", "enum": ["subject", "from", "body", "all"], "default": "all"}, + "folder": {"type": "string", "description": "Mailbox folder", "default": "INBOX"}, + "limit": {"type": "integer", "description": "Max results (1-100)", "default": 20} + } + } +} +``` + +**Returns:** Same shape as `proton_mail_list` plus `"query": "Meeting"`. + +--- + +### `proton_mail_send` + +Send a new email via Bridge SMTP. + +```json +{ + "name": "proton_mail_send", + "description": "Send a new email message.", + "parameters": { + "type": "object", + "properties": { + "to": {"type": "string", "description": "Recipient email(s), comma-separated"}, + "cc": {"type": "string", "description": "CC recipient(s), comma-separated"}, + "bcc": {"type": "string", "description": "BCC recipient(s), comma-separated"}, + "subject": {"type": "string", "description": "Email subject"}, + "body": {"type": "string", "description": "Email body text (plain text)"} + } + } +} +``` + +**Returns:** +```json +{ + "success": true, + "message_id": "", + "to": "bob@example.com", + "subject": "Hello" +} +``` + +--- + +### `proton_mail_reply` + +Reply to an existing email, preserving thread context (In-Reply-To and References headers). + +```json +{ + "name": "proton_mail_reply", + "description": "Reply to an existing email, preserving thread headers.", + "parameters": { + "type": "object", + "properties": { + "uid": {"type": "integer", "description": "UID of the email to reply to"}, + "body": {"type": "string", "description": "Reply body text"}, + "folder": {"type": "string", "description": "Mailbox folder", "default": "INBOX"} + } + } +} +``` + +**Returns:** +```json +{ + "success": true, + "message_id": "", + "in_reply_to": "", + "to": "alice@example.com", + "subject": "Re: Original Subject" +} +``` + +## Setup + +### 1. Install Proton Mail Bridge + +```bash +# Linux (headless) +protonmail-bridge --cli +# Follow interactive setup — login with Proton credentials + +# macOS +brew install --wait proton-mail-bridge +``` + +### 2. Get Bridge password + +In Bridge: Settings → your account → Mailbox configuration → Show password. +This is a **Bridge-generated local password**, not your Proton password. + +### 3. Set environment variables + +```bash +export PROTONMAIL_ACCOUNT="your-email@proton.me" +export PROTONMAIL_BRIDGE_PASSWORD="bridge-generated-password" +``` + +Or add to your Hermes profile's `.env` at `~/.hermes/profiles//.env`: + +```env +PROTONMAIL_ACCOUNT=your-email@proton.me +PROTONMAIL_BRIDGE_PASSWORD=bridge-generated-password +``` + +### 4. Verify + +Call `proton_mail_bridge_status` — you should see `"status": "running"` and `"authenticated": true`. + +## Example Workflows + +**Quick inbox check:** +``` +proton_mail_list({"folder": "INBOX", "limit": 5}) +``` + +**Read and reply:** +``` +1. proton_mail_list({"limit": 10}) +2. proton_mail_read({"uid": 42}) +3. proton_mail_reply({"uid": 42, "body": "Thanks, got it!"}) +``` + +**Search and respond:** +``` +1. proton_mail_search({"query": "invoice", "field": "subject"}) +2. proton_mail_read({"uid": result.uid}) +3. proton_mail_send({"to": result.from, "subject": "Re: invoice", "body": "..."}) +``` + +**Send with CC:** +``` +proton_mail_send({ + "to": "team@example.com", + "cc": "manager@example.com", + "subject": "Status Update", + "body": "All good here." +}) +``` + +## Implementation + +The skill is implemented in pure Python using standard library modules (`imaplib`, `smtplib`, `email`). No external dependencies. + +Reference implementation at `/references/tools.py`. + +### Security + +- **Connections are localhost-only** — Bridge listens on `127.0.0.1` only +- **TLS on IMAP** — `IMAP4_SSL` connects to port 1143 +- **STARTTLS on SMTP** — explicit TLS negotiation on port 1025 +- **Bridge password is NOT your Proton password** — defense-in-depth via Bridge's separate auth +- **Credential injection prevented** — `_sanitize_search_term()` strips control characters and IMAP-special chars from user input +- **No secrets in tool calls** — credentials come from environment, never from tool arguments + +## Known Limitations + +- **Plain text body only** — HTML rendering is not available; HTML emails return the raw HTML source +- **No attachment handling yet** — `proton_mail_read` returns body text only. Attachments are present in the MIME structure but not extracted separately +- **Localhost-only** — the Bridge must run on the same machine as Hermes +- **Bridge required** — the skill doesn't work without the Bridge daemon; it can't log into Proton API directly +- **Single account** — one Bridge instance serves one account; multi-account requires multiple Bridge instances + +## References + +- [Proton Mail Bridge](https://proton.me/mail/bridge) — official download and docs +- [openclaw-protonmail-skill](https://github.com/rvacyber/openclaw-protonmail-skill) — OpenClaw analogue (TypeScript) +- [emersion/hydroxide](https://github.com/emersion/hydroxide) — third-party Bridge alternative for headless servers +- [Hermes-Proton Architecture](../ARCHITECTURE.md) — full architecture document diff --git a/skills/proton-mail/references/tools.py b/skills/proton-mail/references/tools.py new file mode 100644 index 0000000..4d2559e --- /dev/null +++ b/skills/proton-mail/references/tools.py @@ -0,0 +1,726 @@ +"""Proton Mail Bridge Hermes skill — IMAP/SMTP tool handlers. + +Requires Proton Mail Bridge running locally: + IMAP: 127.0.0.1:1143 (TLS) + SMTP: 127.0.0.1:1025 (STARTTLS) + +Credentials from env: + PROTONMAIL_ACCOUNT — email address (e.g. user@proton.me) + PROTONMAIL_BRIDGE_PASSWORD — Bridge-generated app password + PROTONMAIL_IMAP_HOST — optional, default 127.0.0.1 + PROTONMAIL_IMAP_PORT — optional, default 1143 + PROTONMAIL_SMTP_HOST — optional, default 127.0.0.1 + PROTONMAIL_SMTP_PORT — optional, default 1025 +""" + +import email +import imaplib +import json +import os +import smtplib +import socket +import time +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from email.header import decode_header +from email.utils import formatdate, parsedate_to_datetime +from typing import Optional + + +# ── Constants ────────────────────────────────────────────────────────── + +DEFAULT_IMAP_HOST = "127.0.0.1" +DEFAULT_IMAP_PORT = 1143 +DEFAULT_SMTP_HOST = "127.0.0.1" +DEFAULT_SMTP_PORT = 1025 +FETCH_TIMEOUT = 30 +LIST_MAX_DEFAULT = 20 + + +# ── Credential helpers ───────────────────────────────────────────────── + +def _get_config() -> dict: + """Read Bridge connection config from environment.""" + account = os.environ.get("PROTONMAIL_ACCOUNT") + password = os.environ.get("PROTONMAIL_BRIDGE_PASSWORD") + return { + "account": account, + "password": password, + "imap_host": os.environ.get("PROTONMAIL_IMAP_HOST", DEFAULT_IMAP_HOST), + "imap_port": int(os.environ.get("PROTONMAIL_IMAP_PORT", DEFAULT_IMAP_PORT)), + "smtp_host": os.environ.get("PROTONMAIL_SMTP_HOST", DEFAULT_SMTP_HOST), + "smtp_port": int(os.environ.get("PROTONMAIL_SMTP_PORT", DEFAULT_SMTP_PORT)), + } + + +def _check_credentials(cfg: dict) -> Optional[str]: + """Return error string if credentials are missing, else None.""" + if not cfg["account"]: + return "PROTONMAIL_ACCOUNT environment variable is not set" + if not cfg["password"]: + return "PROTONMAIL_BRIDGE_PASSWORD environment variable is not set" + return None + + +def _sanitize_search_term(term: str) -> str: + """Sanitize an IMAP search term — strip control characters and quotes. + + Prevents IMAP injection. Only allows printable ASCII + common unicode + letters used in subjects. + """ + # Remove characters that could break the IMAP SEARCH command + sanitized = "".join(c for c in term if c.isprintable() and c not in '()"*%\\') + return sanitized.strip() + + +# ── IMAP connection ──────────────────────────────────────────────────── + +def _open_imap(cfg: dict) -> tuple[imaplib.IMAP4_SSL, Optional[str]]: + """Connect and authenticate to Bridge IMAP. + + Returns (connection, None) on success, (None, error_string) on failure. + """ + try: + conn = imaplib.IMAP4_SSL(cfg["imap_host"], cfg["imap_port"], timeout=FETCH_TIMEOUT) + status, data = conn.login(cfg["account"], cfg["password"]) + if status != "OK": + msg = data[0].decode("utf-8", errors="replace") if data else "Login failed" + conn.logout() + return None, msg + return conn, None + except (ConnectionRefusedError, socket.timeout, OSError) as e: + return None, f"Cannot connect to Bridge IMAP at {cfg['imap_host']}:{cfg['imap_port']}: {e}" + except imaplib.IMAP4.error as e: + return None, f"IMAP error: {e}" + + +def _decode_mime_header(value: str | bytes | None) -> str: + """Decode a MIME-encoded header value to plain text.""" + if value is None: + return "" + if isinstance(value, bytes): + value = value.decode("utf-8", errors="replace") + decoded_parts = decode_header(value) + parts = [] + for part, charset in decoded_parts: + if isinstance(part, bytes): + try: + parts.append(part.decode(charset or "utf-8", errors="replace")) + except LookupError: + parts.append(part.decode("utf-8", errors="replace")) + else: + parts.append(str(part)) + return " ".join(parts) + + +def _get_body_text(msg: email.message.Message) -> str: + """Extract plain text body from an email message. + + Prefers text/plain; falls back to text/html with note. + """ + if msg.is_multipart(): + # First try text/plain + for part in msg.walk(): + ctype = part.get_content_type() + if ctype == "text/plain": + payload = part.get_payload(decode=True) + if payload: + return payload.decode("utf-8", errors="replace") + # Fall back to text/html + for part in msg.walk(): + ctype = part.get_content_type() + if ctype == "text/html": + payload = part.get_payload(decode=True) + if payload: + return payload.decode("utf-8", errors="replace") + return "" + else: + payload = msg.get_payload(decode=True) + if payload: + return payload.decode("utf-8", errors="replace") + return "" + + +def _parse_fetch_response(msg_data: list) -> dict | None: + """Parse a single IMAP FETCH response into a structured dict. + + Returns dict with uid, subject, from_, to, date, body, message_id, flags, + or None if the data can't be parsed. + """ + if not msg_data or msg_data[0] is None: + return None + + raw = msg_data[0] + if isinstance(raw, tuple) and len(raw) == 2: + uid_flags_bytes, rfc822_bytes = raw + else: + return None + + # Parse the RFC822 message + try: + msg = email.message_from_bytes(rfc822_bytes) + except Exception: + return None + + # Extract UID from the wrapper — look for "UID " keyword + uid_str = uid_flags_bytes.decode("utf-8", errors="replace") + uid = None + parts = uid_str.split() + for i, part in enumerate(parts): + keyword = part.lstrip("(").upper() + if keyword == "UID" and i + 1 < len(parts): + candidate = parts[i + 1].rstrip(")") + if candidate.isdigit(): + uid = int(candidate) + break + + subject = _decode_mime_header(msg.get("Subject", "")) + sender = _decode_mime_header(msg.get("From", "")) + recipient = _decode_mime_header(msg.get("To", "")) + date_raw = msg.get("Date", "") + message_id = msg.get("Message-ID", "") + + # Parse date + date_iso = "" + if date_raw: + try: + dt = parsedate_to_datetime(date_raw) + date_iso = dt.isoformat() + except (ValueError, TypeError): + date_iso = date_raw + + body = _get_body_text(msg) + + return { + "uid": uid, + "subject": subject, + "from": sender, + "to": recipient, + "date": date_iso, + "body": body, + "message_id": message_id, + "flags": [], + } + + +# ── SMTP connection ──────────────────────────────────────────────────── + +def _open_smtp(cfg: dict) -> tuple[smtplib.SMTP | None, str | None]: + """Connect and authenticate to Bridge SMTP. + + Returns (connection, None) on success, (None, error_string) on failure. + """ + try: + conn = smtplib.SMTP(cfg["smtp_host"], cfg["smtp_port"], timeout=FETCH_TIMEOUT) + conn.starttls() + conn.login(cfg["account"], cfg["password"]) + return conn, None + except (ConnectionRefusedError, socket.timeout, OSError) as e: + return None, f"Cannot connect to Bridge SMTP: {e}" + except smtplib.SMTPException as e: + return None, f"SMTP error: {e}" + + +# ── Port check ───────────────────────────────────────────────────────── + +def _check_port_open(host: str, port: int, timeout: float = 3.0) -> bool: + """Check if a TCP port is open (Bridge running check).""" + try: + s = socket.create_connection((host, port), timeout=timeout) + s.close() + return True + except (ConnectionRefusedError, socket.timeout, OSError): + return False + + +# ── Tool Handlers ────────────────────────────────────────────────────── + +def proton_mail_bridge_status(args: dict) -> str: + """Check Proton Bridge status — running, authenticated, and reachable. + + Tool schema: + name: proton_mail_bridge_status + description: Check if Proton Mail Bridge daemon is running and reachable + parameters: {} + """ + cfg = _get_config() + cred_err = _check_credentials(cfg) + if cred_err: + return json.dumps({"status": "unconfigured", "error": cred_err}) + + imap_up = _check_port_open(cfg["imap_host"], cfg["imap_port"]) + smtp_up = _check_port_open(cfg["smtp_host"], cfg["smtp_port"]) + + if imap_up and smtp_up: + # Try a real IMAP login to confirm auth works + conn, err = _open_imap(cfg) + if conn: + conn.logout() + return json.dumps({ + "status": "running", + "imap": f"{cfg['imap_host']}:{cfg['imap_port']}", + "smtp": f"{cfg['smtp_host']}:{cfg['smtp_port']}", + "authenticated": True, + }) + else: + return json.dumps({ + "status": "running", + "imap": f"{cfg['imap_host']}:{cfg['imap_port']}", + "smtp": f"{cfg['smtp_host']}:{cfg['smtp_port']}", + "authenticated": False, + "auth_error": err, + }) + + ports = [] + if not imap_up: + ports.append(f"IMAP {cfg['imap_host']}:{cfg['imap_port']}") + if not smtp_up: + ports.append(f"SMTP {cfg['smtp_host']}:{cfg['smtp_port']}") + + return json.dumps({ + "status": "stopped", + "unreachable": ports, + }) + + +def proton_mail_list(args: dict) -> str: + """List messages in a mailbox folder. + + Tool schema: + name: proton_mail_list + description: List recent email messages in a folder (default INBOX) + parameters: + type: object + properties: + folder: + type: string + description: Mailbox folder (INBOX, Sent, Drafts, etc.) + default: INBOX + limit: + type: integer + description: Max messages to return (1-100) + default: 20 + """ + folder = args.get("folder", "INBOX") + limit = min(int(args.get("limit", LIST_MAX_DEFAULT)), 100) + limit = max(limit, 1) + + cfg = _get_config() + cred_err = _check_credentials(cfg) + if cred_err: + return json.dumps({"success": False, "error": cred_err}) + + conn, err = _open_imap(cfg) + if err: + return json.dumps({"success": False, "error": err}) + + try: + status, data = conn.select(f'"{folder}"', readonly=True) + if status != "OK": + return json.dumps({ + "success": False, "error": f"Cannot select folder '{folder}'" + }) + + # Search all messages, newest first + status, data = conn.search(None, "ALL") + if status != "OK": + return json.dumps({ + "success": False, "error": "IMAP search failed" + }) + + uids = data[0].split() if data[0] else [] + if not uids: + return json.dumps({ + "success": True, "messages": [], "folder": folder + }) + + # Take the N most recent (last N in the list) + recent_uids = uids[-limit:] + + messages = [] + for uid_bytes in recent_uids: + uid = uid_bytes.decode("ascii") + status, msg_data = conn.fetch(uid, "(UID FLAGS INTERNALDATE BODY.PEEK[HEADER.FIELDS (SUBJECT FROM TO DATE MESSAGE-ID)])") + if status != "OK" or not msg_data or msg_data[0] is None: + continue + + raw = msg_data[0] + if isinstance(raw, tuple) and len(raw) == 2: + header_bytes = raw[1] + msg = email.message_from_bytes(header_bytes) + messages.append({ + "uid": int(uid), + "subject": _decode_mime_header(msg.get("Subject", "(no subject)")), + "from": _decode_mime_header(msg.get("From", "")), + "to": _decode_mime_header(msg.get("To", "")), + "date": msg.get("Date", ""), + }) + + return json.dumps({ + "success": True, + "messages": messages, + "folder": folder, + "total": len(uids), + }) + + except imaplib.IMAP4.error as e: + return json.dumps({"success": False, "error": f"IMAP error: {e}"}) + finally: + try: + conn.logout() + except Exception: + pass + + +def proton_mail_read(args: dict) -> str: + """Read a full email message by UID. + + Tool schema: + name: proton_mail_read + description: Read a full email message including body content + parameters: + type: object + properties: + uid: + type: integer + description: UID of the message to read + folder: + type: string + description: Mailbox folder (default INBOX) + default: INBOX + """ + uid = args.get("uid") + if uid is None: + return json.dumps({"success": False, "error": "Missing required parameter: uid"}) + + folder = args.get("folder", "INBOX") + + cfg = _get_config() + cred_err = _check_credentials(cfg) + if cred_err: + return json.dumps({"success": False, "error": cred_err}) + + conn, err = _open_imap(cfg) + if err: + return json.dumps({"success": False, "error": err}) + + try: + conn.select(f'"{folder}"', readonly=True) + status, msg_data = conn.uid("FETCH", str(uid), "(UID FLAGS RFC822)") + if status != "OK" or not msg_data or msg_data[0] is None: + return json.dumps({ + "success": False, "error": f"Message UID {uid} not found in {folder}" + }) + + parsed = _parse_fetch_response(msg_data) + if parsed is None: + return json.dumps({"success": False, "error": "Failed to parse message"}) + + parsed["success"] = True + return json.dumps(parsed) + + except imaplib.IMAP4.error as e: + return json.dumps({"success": False, "error": f"IMAP error: {e}"}) + finally: + try: + conn.logout() + except Exception: + pass + + +def proton_mail_search(args: dict) -> str: + """Search messages across a mailbox folder. + + Tool schema: + name: proton_mail_search + description: Search email messages by query in a specific field + parameters: + type: object + properties: + query: + type: string + description: Search query text (min 2 characters) + field: + type: string + description: Field to search (subject, from, body, or all) + enum: [subject, from, body, all] + default: all + folder: + type: string + description: Mailbox folder (default INBOX) + default: INBOX + limit: + type: integer + description: Max results (1-100) + default: 20 + """ + query = args.get("query", "") + if not query or len(query.strip()) < 2: + return json.dumps({ + "success": False, + "error": "Search query must be at least 2 characters", + }) + + field = args.get("field", "all") + folder = args.get("folder", "INBOX") + limit = min(int(args.get("limit", LIST_MAX_DEFAULT)), 100) + limit = max(limit, 1) + + query = _sanitize_search_term(query) + cfg = _get_config() + + cred_err = _check_credentials(cfg) + if cred_err: + return json.dumps({"success": False, "error": cred_err}) + + conn, err = _open_imap(cfg) + if err: + return json.dumps({"success": False, "error": err}) + + try: + conn.select(f'"{folder}"', readonly=True) + + # Build IMAP SEARCH criteria + search_criteria = [] + if field == "subject": + search_criteria = ["SUBJECT", query] + elif field == "from": + search_criteria = ["FROM", query] + elif field == "body": + search_criteria = ["BODY", query] + else: + # Search across all fields + search_criteria = ["OR", "OR", "SUBJECT", query, "FROM", query, "BODY", query] + + status, data = conn.search(None, *search_criteria) + if status != "OK": + return json.dumps({"success": False, "error": "IMAP search failed"}) + + uid_list = data[0].split() if data[0] else [] + if not uid_list: + return json.dumps({"success": True, "messages": [], "folder": folder}) + + recent = uid_list[-limit:] + + messages = [] + for uid_bytes in recent: + uid_str = uid_bytes.decode("ascii") + status, msg_data = conn.fetch(uid_str, "(UID FLAGS INTERNALDATE BODY.PEEK[HEADER.FIELDS (SUBJECT FROM TO DATE MESSAGE-ID)])") + if status != "OK" or not msg_data or msg_data[0] is None: + continue + + raw = msg_data[0] + if isinstance(raw, tuple) and len(raw) == 2: + header_bytes = raw[1] + msg = email.message_from_bytes(header_bytes) + messages.append({ + "uid": int(uid_str), + "subject": _decode_mime_header(msg.get("Subject", "(no subject)")), + "from": _decode_mime_header(msg.get("From", "")), + "to": _decode_mime_header(msg.get("To", "")), + "date": msg.get("Date", ""), + }) + + return json.dumps({ + "success": True, + "messages": messages, + "folder": folder, + "query": query, + }) + + except imaplib.IMAP4.error as e: + return json.dumps({"success": False, "error": f"IMAP error: {e}"}) + finally: + try: + conn.logout() + except Exception: + pass + + +def proton_mail_send(args: dict) -> str: + """Send a new email via Proton Bridge SMTP. + + Tool schema: + name: proton_mail_send + description: Send a new email message + parameters: + type: object + properties: + to: + type: string + description: Recipient email address(es), comma-separated + cc: + type: string + description: CC recipient(s), comma-separated + bcc: + type: string + description: BCC recipient(s), comma-separated + subject: + type: string + description: Email subject line + body: + type: string + description: Email body text (plain text) + """ + to = args.get("to", "") + cc = args.get("cc", "") + bcc = args.get("bcc", "") + subject = args.get("subject", "").strip() + body = args.get("body", "") + + if not to: + return json.dumps({"success": False, "error": "Missing required parameter: to"}) + if not subject: + return json.dumps({"success": False, "error": "Missing required parameter: subject"}) + + cfg = _get_config() + cred_err = _check_credentials(cfg) + if cred_err: + return json.dumps({"success": False, "error": cred_err}) + + conn, err = _open_smtp(cfg) + if err: + return json.dumps({"success": False, "error": err}) + + try: + msg = MIMEText(body, "plain", "utf-8") + msg["Subject"] = subject + msg["From"] = cfg["account"] + msg["To"] = to + msg["Date"] = formatdate(localtime=True) + msg["X-Mailer"] = "hermes-proton-mail-skill" + + if cc: + msg["Cc"] = cc + + recipients = [r.strip() for r in to.split(",") if r.strip()] + if cc: + recipients.extend(r.strip() for r in cc.split(",") if r.strip()) + if bcc: + recipients.extend(r.strip() for r in bcc.split(",") if r.strip()) + bcc_list = [r.strip() for r in bcc.split(",") if r.strip()] + else: + bcc_list = [] + + # Send — SMTP lib handles BCC by not adding to the envelope + errors, message_id = conn.send_message(msg, from_addr=cfg["account"], + to_addrs=recipients) + + return json.dumps({ + "success": True, + "message_id": message_id, + "to": to, + "subject": subject, + }) + + except smtplib.SMTPException as e: + return json.dumps({"success": False, "error": f"SMTP error: {e}"}) + finally: + try: + conn.quit() + except Exception: + pass + + +def proton_mail_reply(args: dict) -> str: + """Reply to an existing email by UID. + + Tool schema: + name: proton_mail_reply + description: Reply to an existing email, preserving thread headers + parameters: + type: object + properties: + uid: + type: integer + description: UID of the message to reply to + body: + type: string + description: Reply body text + folder: + type: string + description: Mailbox folder (default INBOX) + default: INBOX + """ + uid = args.get("uid") + body = args.get("body", "") + + if uid is None: + return json.dumps({"success": False, "error": "Missing required parameter: uid"}) + if not body: + return json.dumps({"success": False, "error": "Missing required parameter: body"}) + + folder = args.get("folder", "INBOX") + + cfg = _get_config() + cred_err = _check_credentials(cfg) + if cred_err: + return json.dumps({"success": False, "error": cred_err}) + + # Read original message to get thread context + conn, err = _open_imap(cfg) + if err: + return json.dumps({"success": False, "error": err}) + + try: + conn.select(f'"{folder}"', readonly=True) + status, msg_data = conn.uid("FETCH", str(uid), "(UID FLAGS RFC822)") + if status != "OK" or not msg_data or msg_data[0] is None: + return json.dumps({ + "success": False, "error": f"Message UID {uid} not found" + }) + + parsed = _parse_fetch_response(msg_data) + if parsed is None: + return json.dumps({"success": False, "error": "Failed to parse original message"}) + + original_msg_id = parsed.get("message_id", "") + original_subject = parsed.get("subject", "") + original_from = parsed.get("from", "") + original_date = parsed.get("date", "") + + conn.logout() + except imaplib.IMAP4.error as e: + try: + conn.logout() + except Exception: + pass + return json.dumps({"success": False, "error": f"IMAP error reading original: {e}"}) + + # Build thread headers + subject = original_subject + if not subject.lower().startswith("re:"): + subject = f"Re: {subject}" + + smtp_conn, smtp_err = _open_smtp(cfg) + if smtp_err: + return json.dumps({"success": False, "error": smtp_err}) + + try: + msg = MIMEText(body, "plain", "utf-8") + msg["Subject"] = subject + msg["From"] = cfg["account"] + msg["To"] = original_from + msg["Date"] = formatdate(localtime=True) + msg["In-Reply-To"] = original_msg_id + msg["References"] = original_msg_id + msg["X-Mailer"] = "hermes-proton-mail-skill" + + errors, message_id = smtp_conn.send_message( + msg, from_addr=cfg["account"], to_addrs=[original_from]) + + return json.dumps({ + "success": True, + "message_id": message_id, + "in_reply_to": original_msg_id, + "to": original_from, + "subject": subject, + }) + + except smtplib.SMTPException as e: + return json.dumps({"success": False, "error": f"SMTP error: {e}"}) + finally: + try: + smtp_conn.quit() + except Exception: + pass diff --git a/skills/proton_mail/__init__.py b/skills/proton_mail/__init__.py new file mode 100644 index 0000000..22bdd0e --- /dev/null +++ b/skills/proton_mail/__init__.py @@ -0,0 +1 @@ +# proton-mail — Hermes skill for Proton Mail via Bridge IMAP/SMTP diff --git a/skills/proton_mail/tools.py b/skills/proton_mail/tools.py new file mode 100644 index 0000000..4d2559e --- /dev/null +++ b/skills/proton_mail/tools.py @@ -0,0 +1,726 @@ +"""Proton Mail Bridge Hermes skill — IMAP/SMTP tool handlers. + +Requires Proton Mail Bridge running locally: + IMAP: 127.0.0.1:1143 (TLS) + SMTP: 127.0.0.1:1025 (STARTTLS) + +Credentials from env: + PROTONMAIL_ACCOUNT — email address (e.g. user@proton.me) + PROTONMAIL_BRIDGE_PASSWORD — Bridge-generated app password + PROTONMAIL_IMAP_HOST — optional, default 127.0.0.1 + PROTONMAIL_IMAP_PORT — optional, default 1143 + PROTONMAIL_SMTP_HOST — optional, default 127.0.0.1 + PROTONMAIL_SMTP_PORT — optional, default 1025 +""" + +import email +import imaplib +import json +import os +import smtplib +import socket +import time +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from email.header import decode_header +from email.utils import formatdate, parsedate_to_datetime +from typing import Optional + + +# ── Constants ────────────────────────────────────────────────────────── + +DEFAULT_IMAP_HOST = "127.0.0.1" +DEFAULT_IMAP_PORT = 1143 +DEFAULT_SMTP_HOST = "127.0.0.1" +DEFAULT_SMTP_PORT = 1025 +FETCH_TIMEOUT = 30 +LIST_MAX_DEFAULT = 20 + + +# ── Credential helpers ───────────────────────────────────────────────── + +def _get_config() -> dict: + """Read Bridge connection config from environment.""" + account = os.environ.get("PROTONMAIL_ACCOUNT") + password = os.environ.get("PROTONMAIL_BRIDGE_PASSWORD") + return { + "account": account, + "password": password, + "imap_host": os.environ.get("PROTONMAIL_IMAP_HOST", DEFAULT_IMAP_HOST), + "imap_port": int(os.environ.get("PROTONMAIL_IMAP_PORT", DEFAULT_IMAP_PORT)), + "smtp_host": os.environ.get("PROTONMAIL_SMTP_HOST", DEFAULT_SMTP_HOST), + "smtp_port": int(os.environ.get("PROTONMAIL_SMTP_PORT", DEFAULT_SMTP_PORT)), + } + + +def _check_credentials(cfg: dict) -> Optional[str]: + """Return error string if credentials are missing, else None.""" + if not cfg["account"]: + return "PROTONMAIL_ACCOUNT environment variable is not set" + if not cfg["password"]: + return "PROTONMAIL_BRIDGE_PASSWORD environment variable is not set" + return None + + +def _sanitize_search_term(term: str) -> str: + """Sanitize an IMAP search term — strip control characters and quotes. + + Prevents IMAP injection. Only allows printable ASCII + common unicode + letters used in subjects. + """ + # Remove characters that could break the IMAP SEARCH command + sanitized = "".join(c for c in term if c.isprintable() and c not in '()"*%\\') + return sanitized.strip() + + +# ── IMAP connection ──────────────────────────────────────────────────── + +def _open_imap(cfg: dict) -> tuple[imaplib.IMAP4_SSL, Optional[str]]: + """Connect and authenticate to Bridge IMAP. + + Returns (connection, None) on success, (None, error_string) on failure. + """ + try: + conn = imaplib.IMAP4_SSL(cfg["imap_host"], cfg["imap_port"], timeout=FETCH_TIMEOUT) + status, data = conn.login(cfg["account"], cfg["password"]) + if status != "OK": + msg = data[0].decode("utf-8", errors="replace") if data else "Login failed" + conn.logout() + return None, msg + return conn, None + except (ConnectionRefusedError, socket.timeout, OSError) as e: + return None, f"Cannot connect to Bridge IMAP at {cfg['imap_host']}:{cfg['imap_port']}: {e}" + except imaplib.IMAP4.error as e: + return None, f"IMAP error: {e}" + + +def _decode_mime_header(value: str | bytes | None) -> str: + """Decode a MIME-encoded header value to plain text.""" + if value is None: + return "" + if isinstance(value, bytes): + value = value.decode("utf-8", errors="replace") + decoded_parts = decode_header(value) + parts = [] + for part, charset in decoded_parts: + if isinstance(part, bytes): + try: + parts.append(part.decode(charset or "utf-8", errors="replace")) + except LookupError: + parts.append(part.decode("utf-8", errors="replace")) + else: + parts.append(str(part)) + return " ".join(parts) + + +def _get_body_text(msg: email.message.Message) -> str: + """Extract plain text body from an email message. + + Prefers text/plain; falls back to text/html with note. + """ + if msg.is_multipart(): + # First try text/plain + for part in msg.walk(): + ctype = part.get_content_type() + if ctype == "text/plain": + payload = part.get_payload(decode=True) + if payload: + return payload.decode("utf-8", errors="replace") + # Fall back to text/html + for part in msg.walk(): + ctype = part.get_content_type() + if ctype == "text/html": + payload = part.get_payload(decode=True) + if payload: + return payload.decode("utf-8", errors="replace") + return "" + else: + payload = msg.get_payload(decode=True) + if payload: + return payload.decode("utf-8", errors="replace") + return "" + + +def _parse_fetch_response(msg_data: list) -> dict | None: + """Parse a single IMAP FETCH response into a structured dict. + + Returns dict with uid, subject, from_, to, date, body, message_id, flags, + or None if the data can't be parsed. + """ + if not msg_data or msg_data[0] is None: + return None + + raw = msg_data[0] + if isinstance(raw, tuple) and len(raw) == 2: + uid_flags_bytes, rfc822_bytes = raw + else: + return None + + # Parse the RFC822 message + try: + msg = email.message_from_bytes(rfc822_bytes) + except Exception: + return None + + # Extract UID from the wrapper — look for "UID " keyword + uid_str = uid_flags_bytes.decode("utf-8", errors="replace") + uid = None + parts = uid_str.split() + for i, part in enumerate(parts): + keyword = part.lstrip("(").upper() + if keyword == "UID" and i + 1 < len(parts): + candidate = parts[i + 1].rstrip(")") + if candidate.isdigit(): + uid = int(candidate) + break + + subject = _decode_mime_header(msg.get("Subject", "")) + sender = _decode_mime_header(msg.get("From", "")) + recipient = _decode_mime_header(msg.get("To", "")) + date_raw = msg.get("Date", "") + message_id = msg.get("Message-ID", "") + + # Parse date + date_iso = "" + if date_raw: + try: + dt = parsedate_to_datetime(date_raw) + date_iso = dt.isoformat() + except (ValueError, TypeError): + date_iso = date_raw + + body = _get_body_text(msg) + + return { + "uid": uid, + "subject": subject, + "from": sender, + "to": recipient, + "date": date_iso, + "body": body, + "message_id": message_id, + "flags": [], + } + + +# ── SMTP connection ──────────────────────────────────────────────────── + +def _open_smtp(cfg: dict) -> tuple[smtplib.SMTP | None, str | None]: + """Connect and authenticate to Bridge SMTP. + + Returns (connection, None) on success, (None, error_string) on failure. + """ + try: + conn = smtplib.SMTP(cfg["smtp_host"], cfg["smtp_port"], timeout=FETCH_TIMEOUT) + conn.starttls() + conn.login(cfg["account"], cfg["password"]) + return conn, None + except (ConnectionRefusedError, socket.timeout, OSError) as e: + return None, f"Cannot connect to Bridge SMTP: {e}" + except smtplib.SMTPException as e: + return None, f"SMTP error: {e}" + + +# ── Port check ───────────────────────────────────────────────────────── + +def _check_port_open(host: str, port: int, timeout: float = 3.0) -> bool: + """Check if a TCP port is open (Bridge running check).""" + try: + s = socket.create_connection((host, port), timeout=timeout) + s.close() + return True + except (ConnectionRefusedError, socket.timeout, OSError): + return False + + +# ── Tool Handlers ────────────────────────────────────────────────────── + +def proton_mail_bridge_status(args: dict) -> str: + """Check Proton Bridge status — running, authenticated, and reachable. + + Tool schema: + name: proton_mail_bridge_status + description: Check if Proton Mail Bridge daemon is running and reachable + parameters: {} + """ + cfg = _get_config() + cred_err = _check_credentials(cfg) + if cred_err: + return json.dumps({"status": "unconfigured", "error": cred_err}) + + imap_up = _check_port_open(cfg["imap_host"], cfg["imap_port"]) + smtp_up = _check_port_open(cfg["smtp_host"], cfg["smtp_port"]) + + if imap_up and smtp_up: + # Try a real IMAP login to confirm auth works + conn, err = _open_imap(cfg) + if conn: + conn.logout() + return json.dumps({ + "status": "running", + "imap": f"{cfg['imap_host']}:{cfg['imap_port']}", + "smtp": f"{cfg['smtp_host']}:{cfg['smtp_port']}", + "authenticated": True, + }) + else: + return json.dumps({ + "status": "running", + "imap": f"{cfg['imap_host']}:{cfg['imap_port']}", + "smtp": f"{cfg['smtp_host']}:{cfg['smtp_port']}", + "authenticated": False, + "auth_error": err, + }) + + ports = [] + if not imap_up: + ports.append(f"IMAP {cfg['imap_host']}:{cfg['imap_port']}") + if not smtp_up: + ports.append(f"SMTP {cfg['smtp_host']}:{cfg['smtp_port']}") + + return json.dumps({ + "status": "stopped", + "unreachable": ports, + }) + + +def proton_mail_list(args: dict) -> str: + """List messages in a mailbox folder. + + Tool schema: + name: proton_mail_list + description: List recent email messages in a folder (default INBOX) + parameters: + type: object + properties: + folder: + type: string + description: Mailbox folder (INBOX, Sent, Drafts, etc.) + default: INBOX + limit: + type: integer + description: Max messages to return (1-100) + default: 20 + """ + folder = args.get("folder", "INBOX") + limit = min(int(args.get("limit", LIST_MAX_DEFAULT)), 100) + limit = max(limit, 1) + + cfg = _get_config() + cred_err = _check_credentials(cfg) + if cred_err: + return json.dumps({"success": False, "error": cred_err}) + + conn, err = _open_imap(cfg) + if err: + return json.dumps({"success": False, "error": err}) + + try: + status, data = conn.select(f'"{folder}"', readonly=True) + if status != "OK": + return json.dumps({ + "success": False, "error": f"Cannot select folder '{folder}'" + }) + + # Search all messages, newest first + status, data = conn.search(None, "ALL") + if status != "OK": + return json.dumps({ + "success": False, "error": "IMAP search failed" + }) + + uids = data[0].split() if data[0] else [] + if not uids: + return json.dumps({ + "success": True, "messages": [], "folder": folder + }) + + # Take the N most recent (last N in the list) + recent_uids = uids[-limit:] + + messages = [] + for uid_bytes in recent_uids: + uid = uid_bytes.decode("ascii") + status, msg_data = conn.fetch(uid, "(UID FLAGS INTERNALDATE BODY.PEEK[HEADER.FIELDS (SUBJECT FROM TO DATE MESSAGE-ID)])") + if status != "OK" or not msg_data or msg_data[0] is None: + continue + + raw = msg_data[0] + if isinstance(raw, tuple) and len(raw) == 2: + header_bytes = raw[1] + msg = email.message_from_bytes(header_bytes) + messages.append({ + "uid": int(uid), + "subject": _decode_mime_header(msg.get("Subject", "(no subject)")), + "from": _decode_mime_header(msg.get("From", "")), + "to": _decode_mime_header(msg.get("To", "")), + "date": msg.get("Date", ""), + }) + + return json.dumps({ + "success": True, + "messages": messages, + "folder": folder, + "total": len(uids), + }) + + except imaplib.IMAP4.error as e: + return json.dumps({"success": False, "error": f"IMAP error: {e}"}) + finally: + try: + conn.logout() + except Exception: + pass + + +def proton_mail_read(args: dict) -> str: + """Read a full email message by UID. + + Tool schema: + name: proton_mail_read + description: Read a full email message including body content + parameters: + type: object + properties: + uid: + type: integer + description: UID of the message to read + folder: + type: string + description: Mailbox folder (default INBOX) + default: INBOX + """ + uid = args.get("uid") + if uid is None: + return json.dumps({"success": False, "error": "Missing required parameter: uid"}) + + folder = args.get("folder", "INBOX") + + cfg = _get_config() + cred_err = _check_credentials(cfg) + if cred_err: + return json.dumps({"success": False, "error": cred_err}) + + conn, err = _open_imap(cfg) + if err: + return json.dumps({"success": False, "error": err}) + + try: + conn.select(f'"{folder}"', readonly=True) + status, msg_data = conn.uid("FETCH", str(uid), "(UID FLAGS RFC822)") + if status != "OK" or not msg_data or msg_data[0] is None: + return json.dumps({ + "success": False, "error": f"Message UID {uid} not found in {folder}" + }) + + parsed = _parse_fetch_response(msg_data) + if parsed is None: + return json.dumps({"success": False, "error": "Failed to parse message"}) + + parsed["success"] = True + return json.dumps(parsed) + + except imaplib.IMAP4.error as e: + return json.dumps({"success": False, "error": f"IMAP error: {e}"}) + finally: + try: + conn.logout() + except Exception: + pass + + +def proton_mail_search(args: dict) -> str: + """Search messages across a mailbox folder. + + Tool schema: + name: proton_mail_search + description: Search email messages by query in a specific field + parameters: + type: object + properties: + query: + type: string + description: Search query text (min 2 characters) + field: + type: string + description: Field to search (subject, from, body, or all) + enum: [subject, from, body, all] + default: all + folder: + type: string + description: Mailbox folder (default INBOX) + default: INBOX + limit: + type: integer + description: Max results (1-100) + default: 20 + """ + query = args.get("query", "") + if not query or len(query.strip()) < 2: + return json.dumps({ + "success": False, + "error": "Search query must be at least 2 characters", + }) + + field = args.get("field", "all") + folder = args.get("folder", "INBOX") + limit = min(int(args.get("limit", LIST_MAX_DEFAULT)), 100) + limit = max(limit, 1) + + query = _sanitize_search_term(query) + cfg = _get_config() + + cred_err = _check_credentials(cfg) + if cred_err: + return json.dumps({"success": False, "error": cred_err}) + + conn, err = _open_imap(cfg) + if err: + return json.dumps({"success": False, "error": err}) + + try: + conn.select(f'"{folder}"', readonly=True) + + # Build IMAP SEARCH criteria + search_criteria = [] + if field == "subject": + search_criteria = ["SUBJECT", query] + elif field == "from": + search_criteria = ["FROM", query] + elif field == "body": + search_criteria = ["BODY", query] + else: + # Search across all fields + search_criteria = ["OR", "OR", "SUBJECT", query, "FROM", query, "BODY", query] + + status, data = conn.search(None, *search_criteria) + if status != "OK": + return json.dumps({"success": False, "error": "IMAP search failed"}) + + uid_list = data[0].split() if data[0] else [] + if not uid_list: + return json.dumps({"success": True, "messages": [], "folder": folder}) + + recent = uid_list[-limit:] + + messages = [] + for uid_bytes in recent: + uid_str = uid_bytes.decode("ascii") + status, msg_data = conn.fetch(uid_str, "(UID FLAGS INTERNALDATE BODY.PEEK[HEADER.FIELDS (SUBJECT FROM TO DATE MESSAGE-ID)])") + if status != "OK" or not msg_data or msg_data[0] is None: + continue + + raw = msg_data[0] + if isinstance(raw, tuple) and len(raw) == 2: + header_bytes = raw[1] + msg = email.message_from_bytes(header_bytes) + messages.append({ + "uid": int(uid_str), + "subject": _decode_mime_header(msg.get("Subject", "(no subject)")), + "from": _decode_mime_header(msg.get("From", "")), + "to": _decode_mime_header(msg.get("To", "")), + "date": msg.get("Date", ""), + }) + + return json.dumps({ + "success": True, + "messages": messages, + "folder": folder, + "query": query, + }) + + except imaplib.IMAP4.error as e: + return json.dumps({"success": False, "error": f"IMAP error: {e}"}) + finally: + try: + conn.logout() + except Exception: + pass + + +def proton_mail_send(args: dict) -> str: + """Send a new email via Proton Bridge SMTP. + + Tool schema: + name: proton_mail_send + description: Send a new email message + parameters: + type: object + properties: + to: + type: string + description: Recipient email address(es), comma-separated + cc: + type: string + description: CC recipient(s), comma-separated + bcc: + type: string + description: BCC recipient(s), comma-separated + subject: + type: string + description: Email subject line + body: + type: string + description: Email body text (plain text) + """ + to = args.get("to", "") + cc = args.get("cc", "") + bcc = args.get("bcc", "") + subject = args.get("subject", "").strip() + body = args.get("body", "") + + if not to: + return json.dumps({"success": False, "error": "Missing required parameter: to"}) + if not subject: + return json.dumps({"success": False, "error": "Missing required parameter: subject"}) + + cfg = _get_config() + cred_err = _check_credentials(cfg) + if cred_err: + return json.dumps({"success": False, "error": cred_err}) + + conn, err = _open_smtp(cfg) + if err: + return json.dumps({"success": False, "error": err}) + + try: + msg = MIMEText(body, "plain", "utf-8") + msg["Subject"] = subject + msg["From"] = cfg["account"] + msg["To"] = to + msg["Date"] = formatdate(localtime=True) + msg["X-Mailer"] = "hermes-proton-mail-skill" + + if cc: + msg["Cc"] = cc + + recipients = [r.strip() for r in to.split(",") if r.strip()] + if cc: + recipients.extend(r.strip() for r in cc.split(",") if r.strip()) + if bcc: + recipients.extend(r.strip() for r in bcc.split(",") if r.strip()) + bcc_list = [r.strip() for r in bcc.split(",") if r.strip()] + else: + bcc_list = [] + + # Send — SMTP lib handles BCC by not adding to the envelope + errors, message_id = conn.send_message(msg, from_addr=cfg["account"], + to_addrs=recipients) + + return json.dumps({ + "success": True, + "message_id": message_id, + "to": to, + "subject": subject, + }) + + except smtplib.SMTPException as e: + return json.dumps({"success": False, "error": f"SMTP error: {e}"}) + finally: + try: + conn.quit() + except Exception: + pass + + +def proton_mail_reply(args: dict) -> str: + """Reply to an existing email by UID. + + Tool schema: + name: proton_mail_reply + description: Reply to an existing email, preserving thread headers + parameters: + type: object + properties: + uid: + type: integer + description: UID of the message to reply to + body: + type: string + description: Reply body text + folder: + type: string + description: Mailbox folder (default INBOX) + default: INBOX + """ + uid = args.get("uid") + body = args.get("body", "") + + if uid is None: + return json.dumps({"success": False, "error": "Missing required parameter: uid"}) + if not body: + return json.dumps({"success": False, "error": "Missing required parameter: body"}) + + folder = args.get("folder", "INBOX") + + cfg = _get_config() + cred_err = _check_credentials(cfg) + if cred_err: + return json.dumps({"success": False, "error": cred_err}) + + # Read original message to get thread context + conn, err = _open_imap(cfg) + if err: + return json.dumps({"success": False, "error": err}) + + try: + conn.select(f'"{folder}"', readonly=True) + status, msg_data = conn.uid("FETCH", str(uid), "(UID FLAGS RFC822)") + if status != "OK" or not msg_data or msg_data[0] is None: + return json.dumps({ + "success": False, "error": f"Message UID {uid} not found" + }) + + parsed = _parse_fetch_response(msg_data) + if parsed is None: + return json.dumps({"success": False, "error": "Failed to parse original message"}) + + original_msg_id = parsed.get("message_id", "") + original_subject = parsed.get("subject", "") + original_from = parsed.get("from", "") + original_date = parsed.get("date", "") + + conn.logout() + except imaplib.IMAP4.error as e: + try: + conn.logout() + except Exception: + pass + return json.dumps({"success": False, "error": f"IMAP error reading original: {e}"}) + + # Build thread headers + subject = original_subject + if not subject.lower().startswith("re:"): + subject = f"Re: {subject}" + + smtp_conn, smtp_err = _open_smtp(cfg) + if smtp_err: + return json.dumps({"success": False, "error": smtp_err}) + + try: + msg = MIMEText(body, "plain", "utf-8") + msg["Subject"] = subject + msg["From"] = cfg["account"] + msg["To"] = original_from + msg["Date"] = formatdate(localtime=True) + msg["In-Reply-To"] = original_msg_id + msg["References"] = original_msg_id + msg["X-Mailer"] = "hermes-proton-mail-skill" + + errors, message_id = smtp_conn.send_message( + msg, from_addr=cfg["account"], to_addrs=[original_from]) + + return json.dumps({ + "success": True, + "message_id": message_id, + "in_reply_to": original_msg_id, + "to": original_from, + "subject": subject, + }) + + except smtplib.SMTPException as e: + return json.dumps({"success": False, "error": f"SMTP error: {e}"}) + finally: + try: + smtp_conn.quit() + except Exception: + pass diff --git a/tests/test_mail.py b/tests/test_mail.py new file mode 100644 index 0000000..2cf065e --- /dev/null +++ b/tests/test_mail.py @@ -0,0 +1,330 @@ +"""Tests for the proton-mail Hermes skill — IMAP/SMTP via Proton Mail Bridge. + +All tests mock imaplib and smtplib. Real Bridge integration is behind a +pytest.mark.skipif guard for systems with Bridge running. +""" + +import json +import pytest +from unittest.mock import patch, MagicMock, call +from skills.proton_mail import tools as mail_tools + + +# ── Fixtures ────────────────────────────────────────────────────────────── + +@pytest.fixture +def mock_imap(): + """Mock IMAP4_SSL connection returning a logged-in session.""" + with patch("skills.proton_mail.tools.imaplib.IMAP4_SSL") as mock: + inst = mock.return_value + inst.login.return_value = ("OK", [b"Logged in"]) + inst.select.return_value = ("OK", [b"42"]) + inst.search.return_value = ("OK", [b"1 2 3"]) + # Default fetch: one entry (UID 1, basic headers) + inst.fetch.return_value = ( + "OK", + [_make_fetch_response(1, "Default Subject", "default@test.com", + "Mon, 1 Jan 2024 10:00:00 +0000")], + ) + # Default uid fetch + inst.uid.return_value = ( + "OK", + [_make_fetch_response(1, "Default Subject", "default@test.com", + "Mon, 1 Jan 2024 10:00:00 +0000")], + ) + inst.logout.return_value = ("OK", [b"Bye"]) + yield inst + + +@pytest.fixture +def mock_smtp(): + """Mock SMTP connection returning a logged-in session.""" + with patch("skills.proton_mail.tools.smtplib.SMTP") as mock: + inst = mock.return_value + inst.starttls.return_value = None + inst.login.return_value = None + inst.send_message.return_value = ({}, "test-message-id@bridge") + yield inst + + +@pytest.fixture +def mock_env_vars(): + """Set Proton Bridge env vars for tests.""" + with patch.dict("os.environ", { + "PROTONMAIL_ACCOUNT": "test@proton.me", + "PROTONMAIL_BRIDGE_PASSWORD": "bridge-password-123", + }): + yield + + +# ── Helpers ─────────────────────────────────────────────────────────────── + +def _make_fetch_response(uid: int, subject: str, sender: str, date: str, + body: str = "Hello world") -> tuple: + """Build a realistic IMAP FETCH response tuple. + + Returns (uid_data, rfc822_data) as a single-element list per IMAP + protocol — each list item is (header_bytes, message_bytes). + """ + headers = ( + f"Subject: {subject}\r\n" + f"From: {sender}\r\n" + f"To: recipient@proton.me\r\n" + f"Date: {date}\r\n" + f"Message-ID: \r\n" + f"MIME-Version: 1.0\r\n" + f"Content-Type: text/plain; charset=utf-8\r\n" + f"Content-Transfer-Encoding: 7bit\r\n" + f"\r\n" + ).encode("utf-8") + msg_bytes = headers + body.encode("utf-8") + uid_wrapper = f"1 (UID {uid} FLAGS (\\Seen))\r\n".encode("utf-8") + return (uid_wrapper, msg_bytes) + + +# ── Bridge Status ───────────────────────────────────────────────────────── + +class TestBridgeStatus: + def test_bridge_running(self, mock_env_vars): + """Bridge status returns 'running' when IMAP port is reachable.""" + with patch("skills.proton_mail.tools._check_port_open", + return_value=True): + result = json.loads(mail_tools.proton_mail_bridge_status({})) + assert result["status"] == "running" + + def test_bridge_not_running(self, mock_env_vars): + """Bridge status returns 'stopped' when IMAP port is unreachable.""" + with patch("skills.proton_mail.tools._check_port_open", + return_value=False): + result = json.loads(mail_tools.proton_mail_bridge_status({})) + assert result["status"] == "stopped" + + def test_missing_env_vars(self): + """Bridge status returns error when credentials are missing.""" + with patch.dict("os.environ", {}, clear=True): + result = json.loads(mail_tools.proton_mail_bridge_status({})) + assert "error" in result + + +# ── Mail List ───────────────────────────────────────────────────────────── + +class TestMailList: + def test_list_returns_messages(self, mock_imap, mock_env_vars): + """List returns parsed messages with subject, from, date, uid.""" + mock_imap.search.return_value = ("OK", [b"1 2 3"]) + mock_imap.fetch.side_effect = [ + ("OK", [_make_fetch_response(1, "Subject A", "a@test.com", "Mon, 1 Jan 2024 10:00:00 +0000")]), + ("OK", [_make_fetch_response(2, "Subject B", "b@test.com", "Tue, 2 Jan 2024 11:00:00 +0000")]), + ("OK", [_make_fetch_response(3, "Subject C", "c@test.com", "Wed, 3 Jan 2024 12:00:00 +0000")]), + ] + + result = json.loads(mail_tools.proton_mail_list({"folder": "INBOX", "limit": 3})) + + assert result["success"] is True + assert len(result["messages"]) == 3 + assert result["messages"][0]["subject"] == "Subject A" + assert result["messages"][0]["from"] == "a@test.com" + assert result["messages"][2]["uid"] == 3 + assert result["folder"] == "INBOX" + + def test_list_empty_mailbox(self, mock_imap, mock_env_vars): + """List returns empty array when no messages match.""" + mock_imap.search.return_value = ("OK", [b""]) + + result = json.loads(mail_tools.proton_mail_list({"folder": "INBOX", "limit": 10})) + + assert result["success"] is True + assert result["messages"] == [] + + def test_list_custom_folder(self, mock_imap, mock_env_vars): + """List selects a non-INBOX folder.""" + mock_imap.search.return_value = ("OK", [b"1 2"]) + + result = json.loads(mail_tools.proton_mail_list({"folder": "Sent", "limit": 5})) + + mock_imap.select.assert_called_with('"Sent"', readonly=True) + assert result["folder"] == "Sent" + + def test_list_limits_results(self, mock_imap, mock_env_vars): + """List respects the limit parameter.""" + mock_imap.search.return_value = ("OK", [b"1 2 3 4 5 6 7 8 9 10"]) + + result = json.loads(mail_tools.proton_mail_list({"folder": "INBOX", "limit": 3})) + + assert len(result["messages"]) == 3 + + def test_list_imap_error(self, mock_imap, mock_env_vars): + """List returns error on IMAP failure.""" + mock_imap.search.return_value = ("NO", [b"Search failed"]) + + result = json.loads(mail_tools.proton_mail_list({"folder": "INBOX", "limit": 10})) + + assert "error" in result + assert result["success"] is False + + +# ── Mail Read ───────────────────────────────────────────────────────────── + +class TestMailRead: + def test_read_returns_full_message(self, mock_imap, mock_env_vars): + """Read returns subject, from, to, date, body, and uid.""" + mock_imap.uid.return_value = ( + "OK", + [_make_fetch_response(42, "Fancy Subject", "alice@test.com", + "Thu, 4 Jan 2024 14:00:00 +0000", + "This is the message body.\nWith two lines.")] + ) + + result = json.loads(mail_tools.proton_mail_read({"uid": 42, "folder": "INBOX"})) + + assert result["success"] is True + assert result["uid"] == 42 + assert result["subject"] == "Fancy Subject" + assert result["from"] == "alice@test.com" + assert result["body"] == "This is the message body.\nWith two lines." + + def test_read_missing_uid(self, mock_env_vars): + """Read returns error when uid parameter is missing.""" + result = json.loads(mail_tools.proton_mail_read({"folder": "INBOX"})) + + assert "error" in result + + def test_read_uid_not_found(self, mock_imap, mock_env_vars): + """Read returns error when UID fetch returns empty.""" + mock_imap.uid.return_value = ("OK", [None]) + + result = json.loads(mail_tools.proton_mail_read({"uid": 999, "folder": "INBOX"})) + + assert "error" in result + + +# ── Mail Search ─────────────────────────────────────────────────────────── + +class TestMailSearch: + def test_search_by_subject(self, mock_imap, mock_env_vars): + """Search finds messages matching a subject query.""" + mock_imap.search.return_value = ("OK", [b"2 3"]) + mock_imap.fetch.side_effect = [ + ("OK", [_make_fetch_response(2, "Meeting at 3pm", "b@test.com", "Tue, 2 Jan 2024 11:00:00 +0000")]), + ("OK", [_make_fetch_response(3, "Meeting notes", "c@test.com", "Wed, 3 Jan 2024 12:00:00 +0000")]), + ] + + result = json.loads(mail_tools.proton_mail_search({ + "query": "Meeting", "folder": "INBOX", "limit": 10 + })) + + assert result["success"] is True + assert len(result["messages"]) == 2 + assert all("Meeting" in m["subject"] for m in result["messages"]) + + def test_search_from_sender(self, mock_imap, mock_env_vars): + """Search filters by sender.""" + mock_imap.search.return_value = ("OK", [b"1"]) + mock_imap.fetch.return_value = ( + "OK", + [_make_fetch_response(1, "Hello", "specific@test.com", "Mon, 1 Jan 2024 10:00:00 +0000")] + ) + + result = json.loads(mail_tools.proton_mail_search({ + "query": "specific@test.com", "folder": "INBOX", + "limit": 10, "field": "from" + })) + + assert result["success"] is True + assert result["messages"][0]["from"] == "specific@test.com" + + def test_search_no_results(self, mock_imap, mock_env_vars): + """Search returns empty when nothing matches.""" + mock_imap.search.return_value = ("OK", [b""]) + + result = json.loads(mail_tools.proton_mail_search({ + "query": "zzzzzxxxxx", "folder": "INBOX", "limit": 10 + })) + + assert result["success"] is True + assert result["messages"] == [] + + def test_search_requires_query(self, mock_env_vars): + """Search returns error when query is missing or too short.""" + result = json.loads(mail_tools.proton_mail_search({ + "folder": "INBOX", "limit": 10 + })) + assert "error" in result + + +# ── Mail Send ───────────────────────────────────────────────────────────── + +class TestMailSend: + def test_send_plain_text(self, mock_smtp, mock_env_vars): + """Send delivers a plain text email.""" + result = json.loads(mail_tools.proton_mail_send({ + "to": "recipient@example.com", + "subject": "Test Subject", + "body": "Hello from Proton skill!", + })) + + assert result["success"] is True + assert "message_id" in result + + def test_send_with_cc(self, mock_smtp, mock_env_vars): + """Send includes CC recipients.""" + result = json.loads(mail_tools.proton_mail_send({ + "to": "primary@example.com", + "cc": "cc@example.com", + "subject": "Cc Test", + "body": "CC included.", + })) + + assert result["success"] is True + assert mock_smtp.send_message.called + + def test_send_requires_recipients(self, mock_env_vars): + """Send returns error when 'to' is missing.""" + result = json.loads(mail_tools.proton_mail_send({ + "subject": "No Recipient", + "body": "Where does this go?", + })) + assert "error" in result + + def test_send_requires_subject(self, mock_env_vars): + """Send returns error with a helpful message when subject is missing.""" + result = json.loads(mail_tools.proton_mail_send({ + "to": "recipient@example.com", + "body": "No subject", + })) + assert "error" in result + + +# ── Mail Reply ──────────────────────────────────────────────────────────── + +class TestMailReply: + def test_reply_sets_thread_headers(self, mock_imap, mock_smtp, mock_env_vars): + """Reply reads original, sets In-Reply-To and References headers.""" + # Original message fetch + mock_imap.fetch.return_value = ( + "OK", + [_make_fetch_response(10, "Original Thread", "original@test.com", + "Fri, 5 Jan 2024 09:00:00 +0000", + "This is the original email.")] + ) + + result = json.loads(mail_tools.proton_mail_reply({ + "uid": 10, + "body": "Thanks for your email!", + })) + + assert result["success"] is True + + def test_reply_requires_uid(self, mock_env_vars): + """Reply returns error when uid is missing.""" + result = json.loads(mail_tools.proton_mail_reply({ + "body": "Missing uid", + })) + assert "error" in result + + def test_reply_requires_body(self, mock_env_vars): + """Reply returns error when body is missing.""" + result = json.loads(mail_tools.proton_mail_reply({ + "uid": 10, + })) + assert "error" in result