feat(proton-mail): Hermes skill — IMAP/SMTP tools via Bridge

Full Proton Mail Bridge Hermes skill with 6 tools:
- proton_mail_bridge_status — check daemon health
- proton_mail_list — list inbox/folder messages
- proton_mail_read — read full message by UID (body+headers)
- proton_mail_search — search by subject/from/body/all
- proton_mail_send — send email with CC/BCC support
- proton_mail_reply — reply preserving In-Reply-To/References

Implementation: pure Python stdlib (imaplib + smtplib + email),
no external dependencies. 22 unit tests with mocked IMAP/SMTP.

Follows architecture from ARCHITECTURE.md (section 3).
Per-tool auth via PROTONMAIL_ACCOUNT + PROTONMAIL_BRIDGE_PASSWORD env vars.
Bridge runs on 127.0.0.1:1143 (IMAP TLS) / 127.0.0.1:1025 (SMTP STARTTLS).
This commit is contained in:
B.A. Baracus 2026-06-08 18:31:07 +02:00
parent f103d5f44f
commit f8b9991207
No known key found for this signature in database
6 changed files with 2130 additions and 0 deletions

341
skills/proton-mail/SKILL.md Normal file
View file

@ -0,0 +1,341 @@
---
name: proton-mail
description: "Proton Mail via Bridge — read, send, search, and reply to emails using the local Proton Mail Bridge daemon (IMAP 127.0.0.1:1143 / SMTP 127.0.0.1:1025)."
version: 1.0.0
author: Trentuna / B.A. Baracus
license: MIT
platforms: [linux, macos]
metadata:
hermes:
tags: [proton, email, imap, smtp, bridge, productivity]
category: productivity
related_skills: [hermes-agent]
tools:
- proton_mail_bridge_status
- proton_mail_list
- proton_mail_read
- proton_mail_search
- proton_mail_send
- proton_mail_reply
requires_env:
- PROTONMAIL_ACCOUNT
- PROTONMAIL_BRIDGE_PASSWORD
optional_env:
- PROTONMAIL_IMAP_HOST
- PROTONMAIL_IMAP_PORT
- PROTONMAIL_SMTP_HOST
- PROTONMAIL_SMTP_PORT
---
# Proton Mail Bridge — Hermes Skill
Give any Hermes agent native access to Proton Mail via the official [Proton Mail Bridge](https://proton.me/mail/bridge).
The Bridge runs as a local daemon, handles all OpenPGP encryption/decryption transparently, and exposes standard IMAP (read) and SMTP (send) ports on localhost. This skill wraps those ports as Hermes tools.
## How It Works
```
┌──────────────┐ IMAP 127.0.0.1:1143 (TLS) ┌─────────────────┐
│ Hermes │ ───────────────────────────────► │ Proton Bridge │
│ Agent │ │ (local daemon) │
│ (this │ ◄─────────────────────────────── │ │
│ skill) │ SMTP 127.0.0.1:1025 (STARTTLS) │ decrypts PGP │
└──────────────┘ └────────┬────────┘
Proton Servers
```
## Prerequisites
1. **Proton Mail Bridge** installed and running:
- Download: https://proton.me/mail/bridge
- Linux: `protonmail-bridge --cli`
- macOS: `brew install --cask proton-mail-bridge`
2. **Proton Mail account** (Free or paid)
3. **Bridge credentials** — Bridge generates a local app password (NOT your Proton password). Get it from Bridge → Settings → Account → Mailbox configuration → Show password.
## Environment Variables
| Variable | Required | Default | Description |
|----------|----------|---------|-------------|
| `PROTONMAIL_ACCOUNT` | Yes | — | Your Proton email address (e.g. `user@proton.me`) |
| `PROTONMAIL_BRIDGE_PASSWORD` | Yes | — | Bridge-generated app password |
| `PROTONMAIL_IMAP_HOST` | No | `127.0.0.1` | Bridge IMAP hostname |
| `PROTONMAIL_IMAP_PORT` | No | `1143` | Bridge IMAP port |
| `PROTONMAIL_SMTP_HOST` | No | `127.0.0.1` | Bridge SMTP hostname |
| `PROTONMAIL_SMTP_PORT` | No | `1025` | Bridge SMTP port |
## Tool Reference
### `proton_mail_bridge_status`
Check that the Proton Bridge daemon is running, reachable, and authenticated.
```json
{
"name": "proton_mail_bridge_status",
"description": "Check Proton Mail Bridge status — running, authenticated, connected.",
"parameters": {}
}
```
**Returns:**
- `{"status": "running", "imap": "127.0.0.1:1143", "smtp": "127.0.0.1:1025", "authenticated": true}`
- `{"status": "stopped", "unreachable": ["IMAP 127.0.0.1:1143"]}`
- `{"status": "unconfigured", "error": "PROTONMAIL_ACCOUNT environment variable is not set"}`
---
### `proton_mail_list`
List recent messages in a mailbox folder. Returns headers only (no full body).
```json
{
"name": "proton_mail_list",
"description": "List recent email messages in a folder.",
"parameters": {
"type": "object",
"properties": {
"folder": {"type": "string", "description": "Mailbox folder (INBOX, Sent, Drafts, etc.)", "default": "INBOX"},
"limit": {"type": "integer", "description": "Max messages to return (1-100)", "default": 20}
}
}
}
```
**Returns:**
```json
{
"success": true,
"messages": [
{"uid": 42, "subject": "Meeting tomorrow", "from": "alice@example.com",
"to": "you@proton.me", "date": "Tue, 4 Jun 2024 14:00:00 +0000"}
],
"folder": "INBOX",
"total": 137
}
```
---
### `proton_mail_read`
Read a full email by UID — subject, all headers, and body text.
```json
{
"name": "proton_mail_read",
"description": "Read a full email message including body content.",
"parameters": {
"type": "object",
"properties": {
"uid": {"type": "integer", "description": "UID of the message to read"},
"folder": {"type": "string", "description": "Mailbox folder", "default": "INBOX"}
}
}
}
```
**Returns:**
```json
{
"success": true,
"uid": 42,
"subject": "Meeting tomorrow",
"from": "alice@example.com",
"to": "you@proton.me",
"date": "2024-06-04T14:00:00+00:00",
"body": "Hi, let's meet at 3pm tomorrow.\n\nBest,\nAlice",
"message_id": "<msg42@proton.me>",
"flags": []
}
```
---
### `proton_mail_search`
Search emails across a mailbox by subject, sender, body, or all fields.
```json
{
"name": "proton_mail_search",
"description": "Search email messages by query in a specific field.",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query text (min 2 characters)"},
"field": {"type": "string", "description": "Field to search (subject, from, body, or all)", "enum": ["subject", "from", "body", "all"], "default": "all"},
"folder": {"type": "string", "description": "Mailbox folder", "default": "INBOX"},
"limit": {"type": "integer", "description": "Max results (1-100)", "default": 20}
}
}
}
```
**Returns:** Same shape as `proton_mail_list` plus `"query": "Meeting"`.
---
### `proton_mail_send`
Send a new email via Bridge SMTP.
```json
{
"name": "proton_mail_send",
"description": "Send a new email message.",
"parameters": {
"type": "object",
"properties": {
"to": {"type": "string", "description": "Recipient email(s), comma-separated"},
"cc": {"type": "string", "description": "CC recipient(s), comma-separated"},
"bcc": {"type": "string", "description": "BCC recipient(s), comma-separated"},
"subject": {"type": "string", "description": "Email subject"},
"body": {"type": "string", "description": "Email body text (plain text)"}
}
}
}
```
**Returns:**
```json
{
"success": true,
"message_id": "<hermes-...@proton.me>",
"to": "bob@example.com",
"subject": "Hello"
}
```
---
### `proton_mail_reply`
Reply to an existing email, preserving thread context (In-Reply-To and References headers).
```json
{
"name": "proton_mail_reply",
"description": "Reply to an existing email, preserving thread headers.",
"parameters": {
"type": "object",
"properties": {
"uid": {"type": "integer", "description": "UID of the email to reply to"},
"body": {"type": "string", "description": "Reply body text"},
"folder": {"type": "string", "description": "Mailbox folder", "default": "INBOX"}
}
}
}
```
**Returns:**
```json
{
"success": true,
"message_id": "<hermes-reply-...@proton.me>",
"in_reply_to": "<original-msg-id@proton.me>",
"to": "alice@example.com",
"subject": "Re: Original Subject"
}
```
## Setup
### 1. Install Proton Mail Bridge
```bash
# Linux (headless)
protonmail-bridge --cli
# Follow interactive setup — login with Proton credentials
# macOS
brew install --wait proton-mail-bridge
```
### 2. Get Bridge password
In Bridge: Settings → your account → Mailbox configuration → Show password.
This is a **Bridge-generated local password**, not your Proton password.
### 3. Set environment variables
```bash
export PROTONMAIL_ACCOUNT="your-email@proton.me"
export PROTONMAIL_BRIDGE_PASSWORD="bridge-generated-password"
```
Or add to your Hermes profile's `.env` at `~/.hermes/profiles/<name>/.env`:
```env
PROTONMAIL_ACCOUNT=your-email@proton.me
PROTONMAIL_BRIDGE_PASSWORD=bridge-generated-password
```
### 4. Verify
Call `proton_mail_bridge_status` — you should see `"status": "running"` and `"authenticated": true`.
## Example Workflows
**Quick inbox check:**
```
proton_mail_list({"folder": "INBOX", "limit": 5})
```
**Read and reply:**
```
1. proton_mail_list({"limit": 10})
2. proton_mail_read({"uid": 42})
3. proton_mail_reply({"uid": 42, "body": "Thanks, got it!"})
```
**Search and respond:**
```
1. proton_mail_search({"query": "invoice", "field": "subject"})
2. proton_mail_read({"uid": result.uid})
3. proton_mail_send({"to": result.from, "subject": "Re: invoice", "body": "..."})
```
**Send with CC:**
```
proton_mail_send({
"to": "team@example.com",
"cc": "manager@example.com",
"subject": "Status Update",
"body": "All good here."
})
```
## Implementation
The skill is implemented in pure Python using standard library modules (`imaplib`, `smtplib`, `email`). No external dependencies.
Reference implementation at `<skill-dir>/references/tools.py`.
### Security
- **Connections are localhost-only** — Bridge listens on `127.0.0.1` only
- **TLS on IMAP**`IMAP4_SSL` connects to port 1143
- **STARTTLS on SMTP** — explicit TLS negotiation on port 1025
- **Bridge password is NOT your Proton password** — defense-in-depth via Bridge's separate auth
- **Credential injection prevented**`_sanitize_search_term()` strips control characters and IMAP-special chars from user input
- **No secrets in tool calls** — credentials come from environment, never from tool arguments
## Known Limitations
- **Plain text body only** — HTML rendering is not available; HTML emails return the raw HTML source
- **No attachment handling yet**`proton_mail_read` returns body text only. Attachments are present in the MIME structure but not extracted separately
- **Localhost-only** — the Bridge must run on the same machine as Hermes
- **Bridge required** — the skill doesn't work without the Bridge daemon; it can't log into Proton API directly
- **Single account** — one Bridge instance serves one account; multi-account requires multiple Bridge instances
## References
- [Proton Mail Bridge](https://proton.me/mail/bridge) — official download and docs
- [openclaw-protonmail-skill](https://github.com/rvacyber/openclaw-protonmail-skill) — OpenClaw analogue (TypeScript)
- [emersion/hydroxide](https://github.com/emersion/hydroxide) — third-party Bridge alternative for headless servers
- [Hermes-Proton Architecture](../ARCHITECTURE.md) — full architecture document

View file

@ -0,0 +1,726 @@
"""Proton Mail Bridge Hermes skill — IMAP/SMTP tool handlers.
Requires Proton Mail Bridge running locally:
IMAP: 127.0.0.1:1143 (TLS)
SMTP: 127.0.0.1:1025 (STARTTLS)
Credentials from env:
PROTONMAIL_ACCOUNT email address (e.g. user@proton.me)
PROTONMAIL_BRIDGE_PASSWORD Bridge-generated app password
PROTONMAIL_IMAP_HOST optional, default 127.0.0.1
PROTONMAIL_IMAP_PORT optional, default 1143
PROTONMAIL_SMTP_HOST optional, default 127.0.0.1
PROTONMAIL_SMTP_PORT optional, default 1025
"""
import email
import imaplib
import json
import os
import smtplib
import socket
import time
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.header import decode_header
from email.utils import formatdate, parsedate_to_datetime
from typing import Optional
# ── Constants ──────────────────────────────────────────────────────────
DEFAULT_IMAP_HOST = "127.0.0.1"
DEFAULT_IMAP_PORT = 1143
DEFAULT_SMTP_HOST = "127.0.0.1"
DEFAULT_SMTP_PORT = 1025
FETCH_TIMEOUT = 30
LIST_MAX_DEFAULT = 20
# ── Credential helpers ─────────────────────────────────────────────────
def _get_config() -> dict:
"""Read Bridge connection config from environment."""
account = os.environ.get("PROTONMAIL_ACCOUNT")
password = os.environ.get("PROTONMAIL_BRIDGE_PASSWORD")
return {
"account": account,
"password": password,
"imap_host": os.environ.get("PROTONMAIL_IMAP_HOST", DEFAULT_IMAP_HOST),
"imap_port": int(os.environ.get("PROTONMAIL_IMAP_PORT", DEFAULT_IMAP_PORT)),
"smtp_host": os.environ.get("PROTONMAIL_SMTP_HOST", DEFAULT_SMTP_HOST),
"smtp_port": int(os.environ.get("PROTONMAIL_SMTP_PORT", DEFAULT_SMTP_PORT)),
}
def _check_credentials(cfg: dict) -> Optional[str]:
"""Return error string if credentials are missing, else None."""
if not cfg["account"]:
return "PROTONMAIL_ACCOUNT environment variable is not set"
if not cfg["password"]:
return "PROTONMAIL_BRIDGE_PASSWORD environment variable is not set"
return None
def _sanitize_search_term(term: str) -> str:
"""Sanitize an IMAP search term — strip control characters and quotes.
Prevents IMAP injection. Only allows printable ASCII + common unicode
letters used in subjects.
"""
# Remove characters that could break the IMAP SEARCH command
sanitized = "".join(c for c in term if c.isprintable() and c not in '()"*%\\')
return sanitized.strip()
# ── IMAP connection ────────────────────────────────────────────────────
def _open_imap(cfg: dict) -> tuple[imaplib.IMAP4_SSL, Optional[str]]:
"""Connect and authenticate to Bridge IMAP.
Returns (connection, None) on success, (None, error_string) on failure.
"""
try:
conn = imaplib.IMAP4_SSL(cfg["imap_host"], cfg["imap_port"], timeout=FETCH_TIMEOUT)
status, data = conn.login(cfg["account"], cfg["password"])
if status != "OK":
msg = data[0].decode("utf-8", errors="replace") if data else "Login failed"
conn.logout()
return None, msg
return conn, None
except (ConnectionRefusedError, socket.timeout, OSError) as e:
return None, f"Cannot connect to Bridge IMAP at {cfg['imap_host']}:{cfg['imap_port']}: {e}"
except imaplib.IMAP4.error as e:
return None, f"IMAP error: {e}"
def _decode_mime_header(value: str | bytes | None) -> str:
"""Decode a MIME-encoded header value to plain text."""
if value is None:
return ""
if isinstance(value, bytes):
value = value.decode("utf-8", errors="replace")
decoded_parts = decode_header(value)
parts = []
for part, charset in decoded_parts:
if isinstance(part, bytes):
try:
parts.append(part.decode(charset or "utf-8", errors="replace"))
except LookupError:
parts.append(part.decode("utf-8", errors="replace"))
else:
parts.append(str(part))
return " ".join(parts)
def _get_body_text(msg: email.message.Message) -> str:
"""Extract plain text body from an email message.
Prefers text/plain; falls back to text/html with note.
"""
if msg.is_multipart():
# First try text/plain
for part in msg.walk():
ctype = part.get_content_type()
if ctype == "text/plain":
payload = part.get_payload(decode=True)
if payload:
return payload.decode("utf-8", errors="replace")
# Fall back to text/html
for part in msg.walk():
ctype = part.get_content_type()
if ctype == "text/html":
payload = part.get_payload(decode=True)
if payload:
return payload.decode("utf-8", errors="replace")
return ""
else:
payload = msg.get_payload(decode=True)
if payload:
return payload.decode("utf-8", errors="replace")
return ""
def _parse_fetch_response(msg_data: list) -> dict | None:
"""Parse a single IMAP FETCH response into a structured dict.
Returns dict with uid, subject, from_, to, date, body, message_id, flags,
or None if the data can't be parsed.
"""
if not msg_data or msg_data[0] is None:
return None
raw = msg_data[0]
if isinstance(raw, tuple) and len(raw) == 2:
uid_flags_bytes, rfc822_bytes = raw
else:
return None
# Parse the RFC822 message
try:
msg = email.message_from_bytes(rfc822_bytes)
except Exception:
return None
# Extract UID from the wrapper — look for "UID <number>" keyword
uid_str = uid_flags_bytes.decode("utf-8", errors="replace")
uid = None
parts = uid_str.split()
for i, part in enumerate(parts):
keyword = part.lstrip("(").upper()
if keyword == "UID" and i + 1 < len(parts):
candidate = parts[i + 1].rstrip(")")
if candidate.isdigit():
uid = int(candidate)
break
subject = _decode_mime_header(msg.get("Subject", ""))
sender = _decode_mime_header(msg.get("From", ""))
recipient = _decode_mime_header(msg.get("To", ""))
date_raw = msg.get("Date", "")
message_id = msg.get("Message-ID", "")
# Parse date
date_iso = ""
if date_raw:
try:
dt = parsedate_to_datetime(date_raw)
date_iso = dt.isoformat()
except (ValueError, TypeError):
date_iso = date_raw
body = _get_body_text(msg)
return {
"uid": uid,
"subject": subject,
"from": sender,
"to": recipient,
"date": date_iso,
"body": body,
"message_id": message_id,
"flags": [],
}
# ── SMTP connection ────────────────────────────────────────────────────
def _open_smtp(cfg: dict) -> tuple[smtplib.SMTP | None, str | None]:
"""Connect and authenticate to Bridge SMTP.
Returns (connection, None) on success, (None, error_string) on failure.
"""
try:
conn = smtplib.SMTP(cfg["smtp_host"], cfg["smtp_port"], timeout=FETCH_TIMEOUT)
conn.starttls()
conn.login(cfg["account"], cfg["password"])
return conn, None
except (ConnectionRefusedError, socket.timeout, OSError) as e:
return None, f"Cannot connect to Bridge SMTP: {e}"
except smtplib.SMTPException as e:
return None, f"SMTP error: {e}"
# ── Port check ─────────────────────────────────────────────────────────
def _check_port_open(host: str, port: int, timeout: float = 3.0) -> bool:
"""Check if a TCP port is open (Bridge running check)."""
try:
s = socket.create_connection((host, port), timeout=timeout)
s.close()
return True
except (ConnectionRefusedError, socket.timeout, OSError):
return False
# ── Tool Handlers ──────────────────────────────────────────────────────
def proton_mail_bridge_status(args: dict) -> str:
"""Check Proton Bridge status — running, authenticated, and reachable.
Tool schema:
name: proton_mail_bridge_status
description: Check if Proton Mail Bridge daemon is running and reachable
parameters: {}
"""
cfg = _get_config()
cred_err = _check_credentials(cfg)
if cred_err:
return json.dumps({"status": "unconfigured", "error": cred_err})
imap_up = _check_port_open(cfg["imap_host"], cfg["imap_port"])
smtp_up = _check_port_open(cfg["smtp_host"], cfg["smtp_port"])
if imap_up and smtp_up:
# Try a real IMAP login to confirm auth works
conn, err = _open_imap(cfg)
if conn:
conn.logout()
return json.dumps({
"status": "running",
"imap": f"{cfg['imap_host']}:{cfg['imap_port']}",
"smtp": f"{cfg['smtp_host']}:{cfg['smtp_port']}",
"authenticated": True,
})
else:
return json.dumps({
"status": "running",
"imap": f"{cfg['imap_host']}:{cfg['imap_port']}",
"smtp": f"{cfg['smtp_host']}:{cfg['smtp_port']}",
"authenticated": False,
"auth_error": err,
})
ports = []
if not imap_up:
ports.append(f"IMAP {cfg['imap_host']}:{cfg['imap_port']}")
if not smtp_up:
ports.append(f"SMTP {cfg['smtp_host']}:{cfg['smtp_port']}")
return json.dumps({
"status": "stopped",
"unreachable": ports,
})
def proton_mail_list(args: dict) -> str:
"""List messages in a mailbox folder.
Tool schema:
name: proton_mail_list
description: List recent email messages in a folder (default INBOX)
parameters:
type: object
properties:
folder:
type: string
description: Mailbox folder (INBOX, Sent, Drafts, etc.)
default: INBOX
limit:
type: integer
description: Max messages to return (1-100)
default: 20
"""
folder = args.get("folder", "INBOX")
limit = min(int(args.get("limit", LIST_MAX_DEFAULT)), 100)
limit = max(limit, 1)
cfg = _get_config()
cred_err = _check_credentials(cfg)
if cred_err:
return json.dumps({"success": False, "error": cred_err})
conn, err = _open_imap(cfg)
if err:
return json.dumps({"success": False, "error": err})
try:
status, data = conn.select(f'"{folder}"', readonly=True)
if status != "OK":
return json.dumps({
"success": False, "error": f"Cannot select folder '{folder}'"
})
# Search all messages, newest first
status, data = conn.search(None, "ALL")
if status != "OK":
return json.dumps({
"success": False, "error": "IMAP search failed"
})
uids = data[0].split() if data[0] else []
if not uids:
return json.dumps({
"success": True, "messages": [], "folder": folder
})
# Take the N most recent (last N in the list)
recent_uids = uids[-limit:]
messages = []
for uid_bytes in recent_uids:
uid = uid_bytes.decode("ascii")
status, msg_data = conn.fetch(uid, "(UID FLAGS INTERNALDATE BODY.PEEK[HEADER.FIELDS (SUBJECT FROM TO DATE MESSAGE-ID)])")
if status != "OK" or not msg_data or msg_data[0] is None:
continue
raw = msg_data[0]
if isinstance(raw, tuple) and len(raw) == 2:
header_bytes = raw[1]
msg = email.message_from_bytes(header_bytes)
messages.append({
"uid": int(uid),
"subject": _decode_mime_header(msg.get("Subject", "(no subject)")),
"from": _decode_mime_header(msg.get("From", "")),
"to": _decode_mime_header(msg.get("To", "")),
"date": msg.get("Date", ""),
})
return json.dumps({
"success": True,
"messages": messages,
"folder": folder,
"total": len(uids),
})
except imaplib.IMAP4.error as e:
return json.dumps({"success": False, "error": f"IMAP error: {e}"})
finally:
try:
conn.logout()
except Exception:
pass
def proton_mail_read(args: dict) -> str:
"""Read a full email message by UID.
Tool schema:
name: proton_mail_read
description: Read a full email message including body content
parameters:
type: object
properties:
uid:
type: integer
description: UID of the message to read
folder:
type: string
description: Mailbox folder (default INBOX)
default: INBOX
"""
uid = args.get("uid")
if uid is None:
return json.dumps({"success": False, "error": "Missing required parameter: uid"})
folder = args.get("folder", "INBOX")
cfg = _get_config()
cred_err = _check_credentials(cfg)
if cred_err:
return json.dumps({"success": False, "error": cred_err})
conn, err = _open_imap(cfg)
if err:
return json.dumps({"success": False, "error": err})
try:
conn.select(f'"{folder}"', readonly=True)
status, msg_data = conn.uid("FETCH", str(uid), "(UID FLAGS RFC822)")
if status != "OK" or not msg_data or msg_data[0] is None:
return json.dumps({
"success": False, "error": f"Message UID {uid} not found in {folder}"
})
parsed = _parse_fetch_response(msg_data)
if parsed is None:
return json.dumps({"success": False, "error": "Failed to parse message"})
parsed["success"] = True
return json.dumps(parsed)
except imaplib.IMAP4.error as e:
return json.dumps({"success": False, "error": f"IMAP error: {e}"})
finally:
try:
conn.logout()
except Exception:
pass
def proton_mail_search(args: dict) -> str:
"""Search messages across a mailbox folder.
Tool schema:
name: proton_mail_search
description: Search email messages by query in a specific field
parameters:
type: object
properties:
query:
type: string
description: Search query text (min 2 characters)
field:
type: string
description: Field to search (subject, from, body, or all)
enum: [subject, from, body, all]
default: all
folder:
type: string
description: Mailbox folder (default INBOX)
default: INBOX
limit:
type: integer
description: Max results (1-100)
default: 20
"""
query = args.get("query", "")
if not query or len(query.strip()) < 2:
return json.dumps({
"success": False,
"error": "Search query must be at least 2 characters",
})
field = args.get("field", "all")
folder = args.get("folder", "INBOX")
limit = min(int(args.get("limit", LIST_MAX_DEFAULT)), 100)
limit = max(limit, 1)
query = _sanitize_search_term(query)
cfg = _get_config()
cred_err = _check_credentials(cfg)
if cred_err:
return json.dumps({"success": False, "error": cred_err})
conn, err = _open_imap(cfg)
if err:
return json.dumps({"success": False, "error": err})
try:
conn.select(f'"{folder}"', readonly=True)
# Build IMAP SEARCH criteria
search_criteria = []
if field == "subject":
search_criteria = ["SUBJECT", query]
elif field == "from":
search_criteria = ["FROM", query]
elif field == "body":
search_criteria = ["BODY", query]
else:
# Search across all fields
search_criteria = ["OR", "OR", "SUBJECT", query, "FROM", query, "BODY", query]
status, data = conn.search(None, *search_criteria)
if status != "OK":
return json.dumps({"success": False, "error": "IMAP search failed"})
uid_list = data[0].split() if data[0] else []
if not uid_list:
return json.dumps({"success": True, "messages": [], "folder": folder})
recent = uid_list[-limit:]
messages = []
for uid_bytes in recent:
uid_str = uid_bytes.decode("ascii")
status, msg_data = conn.fetch(uid_str, "(UID FLAGS INTERNALDATE BODY.PEEK[HEADER.FIELDS (SUBJECT FROM TO DATE MESSAGE-ID)])")
if status != "OK" or not msg_data or msg_data[0] is None:
continue
raw = msg_data[0]
if isinstance(raw, tuple) and len(raw) == 2:
header_bytes = raw[1]
msg = email.message_from_bytes(header_bytes)
messages.append({
"uid": int(uid_str),
"subject": _decode_mime_header(msg.get("Subject", "(no subject)")),
"from": _decode_mime_header(msg.get("From", "")),
"to": _decode_mime_header(msg.get("To", "")),
"date": msg.get("Date", ""),
})
return json.dumps({
"success": True,
"messages": messages,
"folder": folder,
"query": query,
})
except imaplib.IMAP4.error as e:
return json.dumps({"success": False, "error": f"IMAP error: {e}"})
finally:
try:
conn.logout()
except Exception:
pass
def proton_mail_send(args: dict) -> str:
"""Send a new email via Proton Bridge SMTP.
Tool schema:
name: proton_mail_send
description: Send a new email message
parameters:
type: object
properties:
to:
type: string
description: Recipient email address(es), comma-separated
cc:
type: string
description: CC recipient(s), comma-separated
bcc:
type: string
description: BCC recipient(s), comma-separated
subject:
type: string
description: Email subject line
body:
type: string
description: Email body text (plain text)
"""
to = args.get("to", "")
cc = args.get("cc", "")
bcc = args.get("bcc", "")
subject = args.get("subject", "").strip()
body = args.get("body", "")
if not to:
return json.dumps({"success": False, "error": "Missing required parameter: to"})
if not subject:
return json.dumps({"success": False, "error": "Missing required parameter: subject"})
cfg = _get_config()
cred_err = _check_credentials(cfg)
if cred_err:
return json.dumps({"success": False, "error": cred_err})
conn, err = _open_smtp(cfg)
if err:
return json.dumps({"success": False, "error": err})
try:
msg = MIMEText(body, "plain", "utf-8")
msg["Subject"] = subject
msg["From"] = cfg["account"]
msg["To"] = to
msg["Date"] = formatdate(localtime=True)
msg["X-Mailer"] = "hermes-proton-mail-skill"
if cc:
msg["Cc"] = cc
recipients = [r.strip() for r in to.split(",") if r.strip()]
if cc:
recipients.extend(r.strip() for r in cc.split(",") if r.strip())
if bcc:
recipients.extend(r.strip() for r in bcc.split(",") if r.strip())
bcc_list = [r.strip() for r in bcc.split(",") if r.strip()]
else:
bcc_list = []
# Send — SMTP lib handles BCC by not adding to the envelope
errors, message_id = conn.send_message(msg, from_addr=cfg["account"],
to_addrs=recipients)
return json.dumps({
"success": True,
"message_id": message_id,
"to": to,
"subject": subject,
})
except smtplib.SMTPException as e:
return json.dumps({"success": False, "error": f"SMTP error: {e}"})
finally:
try:
conn.quit()
except Exception:
pass
def proton_mail_reply(args: dict) -> str:
"""Reply to an existing email by UID.
Tool schema:
name: proton_mail_reply
description: Reply to an existing email, preserving thread headers
parameters:
type: object
properties:
uid:
type: integer
description: UID of the message to reply to
body:
type: string
description: Reply body text
folder:
type: string
description: Mailbox folder (default INBOX)
default: INBOX
"""
uid = args.get("uid")
body = args.get("body", "")
if uid is None:
return json.dumps({"success": False, "error": "Missing required parameter: uid"})
if not body:
return json.dumps({"success": False, "error": "Missing required parameter: body"})
folder = args.get("folder", "INBOX")
cfg = _get_config()
cred_err = _check_credentials(cfg)
if cred_err:
return json.dumps({"success": False, "error": cred_err})
# Read original message to get thread context
conn, err = _open_imap(cfg)
if err:
return json.dumps({"success": False, "error": err})
try:
conn.select(f'"{folder}"', readonly=True)
status, msg_data = conn.uid("FETCH", str(uid), "(UID FLAGS RFC822)")
if status != "OK" or not msg_data or msg_data[0] is None:
return json.dumps({
"success": False, "error": f"Message UID {uid} not found"
})
parsed = _parse_fetch_response(msg_data)
if parsed is None:
return json.dumps({"success": False, "error": "Failed to parse original message"})
original_msg_id = parsed.get("message_id", "")
original_subject = parsed.get("subject", "")
original_from = parsed.get("from", "")
original_date = parsed.get("date", "")
conn.logout()
except imaplib.IMAP4.error as e:
try:
conn.logout()
except Exception:
pass
return json.dumps({"success": False, "error": f"IMAP error reading original: {e}"})
# Build thread headers
subject = original_subject
if not subject.lower().startswith("re:"):
subject = f"Re: {subject}"
smtp_conn, smtp_err = _open_smtp(cfg)
if smtp_err:
return json.dumps({"success": False, "error": smtp_err})
try:
msg = MIMEText(body, "plain", "utf-8")
msg["Subject"] = subject
msg["From"] = cfg["account"]
msg["To"] = original_from
msg["Date"] = formatdate(localtime=True)
msg["In-Reply-To"] = original_msg_id
msg["References"] = original_msg_id
msg["X-Mailer"] = "hermes-proton-mail-skill"
errors, message_id = smtp_conn.send_message(
msg, from_addr=cfg["account"], to_addrs=[original_from])
return json.dumps({
"success": True,
"message_id": message_id,
"in_reply_to": original_msg_id,
"to": original_from,
"subject": subject,
})
except smtplib.SMTPException as e:
return json.dumps({"success": False, "error": f"SMTP error: {e}"})
finally:
try:
smtp_conn.quit()
except Exception:
pass

View file

@ -0,0 +1 @@
# proton-mail — Hermes skill for Proton Mail via Bridge IMAP/SMTP

726
skills/proton_mail/tools.py Normal file
View file

@ -0,0 +1,726 @@
"""Proton Mail Bridge Hermes skill — IMAP/SMTP tool handlers.
Requires Proton Mail Bridge running locally:
IMAP: 127.0.0.1:1143 (TLS)
SMTP: 127.0.0.1:1025 (STARTTLS)
Credentials from env:
PROTONMAIL_ACCOUNT email address (e.g. user@proton.me)
PROTONMAIL_BRIDGE_PASSWORD Bridge-generated app password
PROTONMAIL_IMAP_HOST optional, default 127.0.0.1
PROTONMAIL_IMAP_PORT optional, default 1143
PROTONMAIL_SMTP_HOST optional, default 127.0.0.1
PROTONMAIL_SMTP_PORT optional, default 1025
"""
import email
import imaplib
import json
import os
import smtplib
import socket
import time
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.header import decode_header
from email.utils import formatdate, parsedate_to_datetime
from typing import Optional
# ── Constants ──────────────────────────────────────────────────────────
DEFAULT_IMAP_HOST = "127.0.0.1"
DEFAULT_IMAP_PORT = 1143
DEFAULT_SMTP_HOST = "127.0.0.1"
DEFAULT_SMTP_PORT = 1025
FETCH_TIMEOUT = 30
LIST_MAX_DEFAULT = 20
# ── Credential helpers ─────────────────────────────────────────────────
def _get_config() -> dict:
"""Read Bridge connection config from environment."""
account = os.environ.get("PROTONMAIL_ACCOUNT")
password = os.environ.get("PROTONMAIL_BRIDGE_PASSWORD")
return {
"account": account,
"password": password,
"imap_host": os.environ.get("PROTONMAIL_IMAP_HOST", DEFAULT_IMAP_HOST),
"imap_port": int(os.environ.get("PROTONMAIL_IMAP_PORT", DEFAULT_IMAP_PORT)),
"smtp_host": os.environ.get("PROTONMAIL_SMTP_HOST", DEFAULT_SMTP_HOST),
"smtp_port": int(os.environ.get("PROTONMAIL_SMTP_PORT", DEFAULT_SMTP_PORT)),
}
def _check_credentials(cfg: dict) -> Optional[str]:
"""Return error string if credentials are missing, else None."""
if not cfg["account"]:
return "PROTONMAIL_ACCOUNT environment variable is not set"
if not cfg["password"]:
return "PROTONMAIL_BRIDGE_PASSWORD environment variable is not set"
return None
def _sanitize_search_term(term: str) -> str:
"""Sanitize an IMAP search term — strip control characters and quotes.
Prevents IMAP injection. Only allows printable ASCII + common unicode
letters used in subjects.
"""
# Remove characters that could break the IMAP SEARCH command
sanitized = "".join(c for c in term if c.isprintable() and c not in '()"*%\\')
return sanitized.strip()
# ── IMAP connection ────────────────────────────────────────────────────
def _open_imap(cfg: dict) -> tuple[imaplib.IMAP4_SSL, Optional[str]]:
"""Connect and authenticate to Bridge IMAP.
Returns (connection, None) on success, (None, error_string) on failure.
"""
try:
conn = imaplib.IMAP4_SSL(cfg["imap_host"], cfg["imap_port"], timeout=FETCH_TIMEOUT)
status, data = conn.login(cfg["account"], cfg["password"])
if status != "OK":
msg = data[0].decode("utf-8", errors="replace") if data else "Login failed"
conn.logout()
return None, msg
return conn, None
except (ConnectionRefusedError, socket.timeout, OSError) as e:
return None, f"Cannot connect to Bridge IMAP at {cfg['imap_host']}:{cfg['imap_port']}: {e}"
except imaplib.IMAP4.error as e:
return None, f"IMAP error: {e}"
def _decode_mime_header(value: str | bytes | None) -> str:
"""Decode a MIME-encoded header value to plain text."""
if value is None:
return ""
if isinstance(value, bytes):
value = value.decode("utf-8", errors="replace")
decoded_parts = decode_header(value)
parts = []
for part, charset in decoded_parts:
if isinstance(part, bytes):
try:
parts.append(part.decode(charset or "utf-8", errors="replace"))
except LookupError:
parts.append(part.decode("utf-8", errors="replace"))
else:
parts.append(str(part))
return " ".join(parts)
def _get_body_text(msg: email.message.Message) -> str:
"""Extract plain text body from an email message.
Prefers text/plain; falls back to text/html with note.
"""
if msg.is_multipart():
# First try text/plain
for part in msg.walk():
ctype = part.get_content_type()
if ctype == "text/plain":
payload = part.get_payload(decode=True)
if payload:
return payload.decode("utf-8", errors="replace")
# Fall back to text/html
for part in msg.walk():
ctype = part.get_content_type()
if ctype == "text/html":
payload = part.get_payload(decode=True)
if payload:
return payload.decode("utf-8", errors="replace")
return ""
else:
payload = msg.get_payload(decode=True)
if payload:
return payload.decode("utf-8", errors="replace")
return ""
def _parse_fetch_response(msg_data: list) -> dict | None:
"""Parse a single IMAP FETCH response into a structured dict.
Returns dict with uid, subject, from_, to, date, body, message_id, flags,
or None if the data can't be parsed.
"""
if not msg_data or msg_data[0] is None:
return None
raw = msg_data[0]
if isinstance(raw, tuple) and len(raw) == 2:
uid_flags_bytes, rfc822_bytes = raw
else:
return None
# Parse the RFC822 message
try:
msg = email.message_from_bytes(rfc822_bytes)
except Exception:
return None
# Extract UID from the wrapper — look for "UID <number>" keyword
uid_str = uid_flags_bytes.decode("utf-8", errors="replace")
uid = None
parts = uid_str.split()
for i, part in enumerate(parts):
keyword = part.lstrip("(").upper()
if keyword == "UID" and i + 1 < len(parts):
candidate = parts[i + 1].rstrip(")")
if candidate.isdigit():
uid = int(candidate)
break
subject = _decode_mime_header(msg.get("Subject", ""))
sender = _decode_mime_header(msg.get("From", ""))
recipient = _decode_mime_header(msg.get("To", ""))
date_raw = msg.get("Date", "")
message_id = msg.get("Message-ID", "")
# Parse date
date_iso = ""
if date_raw:
try:
dt = parsedate_to_datetime(date_raw)
date_iso = dt.isoformat()
except (ValueError, TypeError):
date_iso = date_raw
body = _get_body_text(msg)
return {
"uid": uid,
"subject": subject,
"from": sender,
"to": recipient,
"date": date_iso,
"body": body,
"message_id": message_id,
"flags": [],
}
# ── SMTP connection ────────────────────────────────────────────────────
def _open_smtp(cfg: dict) -> tuple[smtplib.SMTP | None, str | None]:
"""Connect and authenticate to Bridge SMTP.
Returns (connection, None) on success, (None, error_string) on failure.
"""
try:
conn = smtplib.SMTP(cfg["smtp_host"], cfg["smtp_port"], timeout=FETCH_TIMEOUT)
conn.starttls()
conn.login(cfg["account"], cfg["password"])
return conn, None
except (ConnectionRefusedError, socket.timeout, OSError) as e:
return None, f"Cannot connect to Bridge SMTP: {e}"
except smtplib.SMTPException as e:
return None, f"SMTP error: {e}"
# ── Port check ─────────────────────────────────────────────────────────
def _check_port_open(host: str, port: int, timeout: float = 3.0) -> bool:
"""Check if a TCP port is open (Bridge running check)."""
try:
s = socket.create_connection((host, port), timeout=timeout)
s.close()
return True
except (ConnectionRefusedError, socket.timeout, OSError):
return False
# ── Tool Handlers ──────────────────────────────────────────────────────
def proton_mail_bridge_status(args: dict) -> str:
"""Check Proton Bridge status — running, authenticated, and reachable.
Tool schema:
name: proton_mail_bridge_status
description: Check if Proton Mail Bridge daemon is running and reachable
parameters: {}
"""
cfg = _get_config()
cred_err = _check_credentials(cfg)
if cred_err:
return json.dumps({"status": "unconfigured", "error": cred_err})
imap_up = _check_port_open(cfg["imap_host"], cfg["imap_port"])
smtp_up = _check_port_open(cfg["smtp_host"], cfg["smtp_port"])
if imap_up and smtp_up:
# Try a real IMAP login to confirm auth works
conn, err = _open_imap(cfg)
if conn:
conn.logout()
return json.dumps({
"status": "running",
"imap": f"{cfg['imap_host']}:{cfg['imap_port']}",
"smtp": f"{cfg['smtp_host']}:{cfg['smtp_port']}",
"authenticated": True,
})
else:
return json.dumps({
"status": "running",
"imap": f"{cfg['imap_host']}:{cfg['imap_port']}",
"smtp": f"{cfg['smtp_host']}:{cfg['smtp_port']}",
"authenticated": False,
"auth_error": err,
})
ports = []
if not imap_up:
ports.append(f"IMAP {cfg['imap_host']}:{cfg['imap_port']}")
if not smtp_up:
ports.append(f"SMTP {cfg['smtp_host']}:{cfg['smtp_port']}")
return json.dumps({
"status": "stopped",
"unreachable": ports,
})
def proton_mail_list(args: dict) -> str:
"""List messages in a mailbox folder.
Tool schema:
name: proton_mail_list
description: List recent email messages in a folder (default INBOX)
parameters:
type: object
properties:
folder:
type: string
description: Mailbox folder (INBOX, Sent, Drafts, etc.)
default: INBOX
limit:
type: integer
description: Max messages to return (1-100)
default: 20
"""
folder = args.get("folder", "INBOX")
limit = min(int(args.get("limit", LIST_MAX_DEFAULT)), 100)
limit = max(limit, 1)
cfg = _get_config()
cred_err = _check_credentials(cfg)
if cred_err:
return json.dumps({"success": False, "error": cred_err})
conn, err = _open_imap(cfg)
if err:
return json.dumps({"success": False, "error": err})
try:
status, data = conn.select(f'"{folder}"', readonly=True)
if status != "OK":
return json.dumps({
"success": False, "error": f"Cannot select folder '{folder}'"
})
# Search all messages, newest first
status, data = conn.search(None, "ALL")
if status != "OK":
return json.dumps({
"success": False, "error": "IMAP search failed"
})
uids = data[0].split() if data[0] else []
if not uids:
return json.dumps({
"success": True, "messages": [], "folder": folder
})
# Take the N most recent (last N in the list)
recent_uids = uids[-limit:]
messages = []
for uid_bytes in recent_uids:
uid = uid_bytes.decode("ascii")
status, msg_data = conn.fetch(uid, "(UID FLAGS INTERNALDATE BODY.PEEK[HEADER.FIELDS (SUBJECT FROM TO DATE MESSAGE-ID)])")
if status != "OK" or not msg_data or msg_data[0] is None:
continue
raw = msg_data[0]
if isinstance(raw, tuple) and len(raw) == 2:
header_bytes = raw[1]
msg = email.message_from_bytes(header_bytes)
messages.append({
"uid": int(uid),
"subject": _decode_mime_header(msg.get("Subject", "(no subject)")),
"from": _decode_mime_header(msg.get("From", "")),
"to": _decode_mime_header(msg.get("To", "")),
"date": msg.get("Date", ""),
})
return json.dumps({
"success": True,
"messages": messages,
"folder": folder,
"total": len(uids),
})
except imaplib.IMAP4.error as e:
return json.dumps({"success": False, "error": f"IMAP error: {e}"})
finally:
try:
conn.logout()
except Exception:
pass
def proton_mail_read(args: dict) -> str:
"""Read a full email message by UID.
Tool schema:
name: proton_mail_read
description: Read a full email message including body content
parameters:
type: object
properties:
uid:
type: integer
description: UID of the message to read
folder:
type: string
description: Mailbox folder (default INBOX)
default: INBOX
"""
uid = args.get("uid")
if uid is None:
return json.dumps({"success": False, "error": "Missing required parameter: uid"})
folder = args.get("folder", "INBOX")
cfg = _get_config()
cred_err = _check_credentials(cfg)
if cred_err:
return json.dumps({"success": False, "error": cred_err})
conn, err = _open_imap(cfg)
if err:
return json.dumps({"success": False, "error": err})
try:
conn.select(f'"{folder}"', readonly=True)
status, msg_data = conn.uid("FETCH", str(uid), "(UID FLAGS RFC822)")
if status != "OK" or not msg_data or msg_data[0] is None:
return json.dumps({
"success": False, "error": f"Message UID {uid} not found in {folder}"
})
parsed = _parse_fetch_response(msg_data)
if parsed is None:
return json.dumps({"success": False, "error": "Failed to parse message"})
parsed["success"] = True
return json.dumps(parsed)
except imaplib.IMAP4.error as e:
return json.dumps({"success": False, "error": f"IMAP error: {e}"})
finally:
try:
conn.logout()
except Exception:
pass
def proton_mail_search(args: dict) -> str:
"""Search messages across a mailbox folder.
Tool schema:
name: proton_mail_search
description: Search email messages by query in a specific field
parameters:
type: object
properties:
query:
type: string
description: Search query text (min 2 characters)
field:
type: string
description: Field to search (subject, from, body, or all)
enum: [subject, from, body, all]
default: all
folder:
type: string
description: Mailbox folder (default INBOX)
default: INBOX
limit:
type: integer
description: Max results (1-100)
default: 20
"""
query = args.get("query", "")
if not query or len(query.strip()) < 2:
return json.dumps({
"success": False,
"error": "Search query must be at least 2 characters",
})
field = args.get("field", "all")
folder = args.get("folder", "INBOX")
limit = min(int(args.get("limit", LIST_MAX_DEFAULT)), 100)
limit = max(limit, 1)
query = _sanitize_search_term(query)
cfg = _get_config()
cred_err = _check_credentials(cfg)
if cred_err:
return json.dumps({"success": False, "error": cred_err})
conn, err = _open_imap(cfg)
if err:
return json.dumps({"success": False, "error": err})
try:
conn.select(f'"{folder}"', readonly=True)
# Build IMAP SEARCH criteria
search_criteria = []
if field == "subject":
search_criteria = ["SUBJECT", query]
elif field == "from":
search_criteria = ["FROM", query]
elif field == "body":
search_criteria = ["BODY", query]
else:
# Search across all fields
search_criteria = ["OR", "OR", "SUBJECT", query, "FROM", query, "BODY", query]
status, data = conn.search(None, *search_criteria)
if status != "OK":
return json.dumps({"success": False, "error": "IMAP search failed"})
uid_list = data[0].split() if data[0] else []
if not uid_list:
return json.dumps({"success": True, "messages": [], "folder": folder})
recent = uid_list[-limit:]
messages = []
for uid_bytes in recent:
uid_str = uid_bytes.decode("ascii")
status, msg_data = conn.fetch(uid_str, "(UID FLAGS INTERNALDATE BODY.PEEK[HEADER.FIELDS (SUBJECT FROM TO DATE MESSAGE-ID)])")
if status != "OK" or not msg_data or msg_data[0] is None:
continue
raw = msg_data[0]
if isinstance(raw, tuple) and len(raw) == 2:
header_bytes = raw[1]
msg = email.message_from_bytes(header_bytes)
messages.append({
"uid": int(uid_str),
"subject": _decode_mime_header(msg.get("Subject", "(no subject)")),
"from": _decode_mime_header(msg.get("From", "")),
"to": _decode_mime_header(msg.get("To", "")),
"date": msg.get("Date", ""),
})
return json.dumps({
"success": True,
"messages": messages,
"folder": folder,
"query": query,
})
except imaplib.IMAP4.error as e:
return json.dumps({"success": False, "error": f"IMAP error: {e}"})
finally:
try:
conn.logout()
except Exception:
pass
def proton_mail_send(args: dict) -> str:
"""Send a new email via Proton Bridge SMTP.
Tool schema:
name: proton_mail_send
description: Send a new email message
parameters:
type: object
properties:
to:
type: string
description: Recipient email address(es), comma-separated
cc:
type: string
description: CC recipient(s), comma-separated
bcc:
type: string
description: BCC recipient(s), comma-separated
subject:
type: string
description: Email subject line
body:
type: string
description: Email body text (plain text)
"""
to = args.get("to", "")
cc = args.get("cc", "")
bcc = args.get("bcc", "")
subject = args.get("subject", "").strip()
body = args.get("body", "")
if not to:
return json.dumps({"success": False, "error": "Missing required parameter: to"})
if not subject:
return json.dumps({"success": False, "error": "Missing required parameter: subject"})
cfg = _get_config()
cred_err = _check_credentials(cfg)
if cred_err:
return json.dumps({"success": False, "error": cred_err})
conn, err = _open_smtp(cfg)
if err:
return json.dumps({"success": False, "error": err})
try:
msg = MIMEText(body, "plain", "utf-8")
msg["Subject"] = subject
msg["From"] = cfg["account"]
msg["To"] = to
msg["Date"] = formatdate(localtime=True)
msg["X-Mailer"] = "hermes-proton-mail-skill"
if cc:
msg["Cc"] = cc
recipients = [r.strip() for r in to.split(",") if r.strip()]
if cc:
recipients.extend(r.strip() for r in cc.split(",") if r.strip())
if bcc:
recipients.extend(r.strip() for r in bcc.split(",") if r.strip())
bcc_list = [r.strip() for r in bcc.split(",") if r.strip()]
else:
bcc_list = []
# Send — SMTP lib handles BCC by not adding to the envelope
errors, message_id = conn.send_message(msg, from_addr=cfg["account"],
to_addrs=recipients)
return json.dumps({
"success": True,
"message_id": message_id,
"to": to,
"subject": subject,
})
except smtplib.SMTPException as e:
return json.dumps({"success": False, "error": f"SMTP error: {e}"})
finally:
try:
conn.quit()
except Exception:
pass
def proton_mail_reply(args: dict) -> str:
"""Reply to an existing email by UID.
Tool schema:
name: proton_mail_reply
description: Reply to an existing email, preserving thread headers
parameters:
type: object
properties:
uid:
type: integer
description: UID of the message to reply to
body:
type: string
description: Reply body text
folder:
type: string
description: Mailbox folder (default INBOX)
default: INBOX
"""
uid = args.get("uid")
body = args.get("body", "")
if uid is None:
return json.dumps({"success": False, "error": "Missing required parameter: uid"})
if not body:
return json.dumps({"success": False, "error": "Missing required parameter: body"})
folder = args.get("folder", "INBOX")
cfg = _get_config()
cred_err = _check_credentials(cfg)
if cred_err:
return json.dumps({"success": False, "error": cred_err})
# Read original message to get thread context
conn, err = _open_imap(cfg)
if err:
return json.dumps({"success": False, "error": err})
try:
conn.select(f'"{folder}"', readonly=True)
status, msg_data = conn.uid("FETCH", str(uid), "(UID FLAGS RFC822)")
if status != "OK" or not msg_data or msg_data[0] is None:
return json.dumps({
"success": False, "error": f"Message UID {uid} not found"
})
parsed = _parse_fetch_response(msg_data)
if parsed is None:
return json.dumps({"success": False, "error": "Failed to parse original message"})
original_msg_id = parsed.get("message_id", "")
original_subject = parsed.get("subject", "")
original_from = parsed.get("from", "")
original_date = parsed.get("date", "")
conn.logout()
except imaplib.IMAP4.error as e:
try:
conn.logout()
except Exception:
pass
return json.dumps({"success": False, "error": f"IMAP error reading original: {e}"})
# Build thread headers
subject = original_subject
if not subject.lower().startswith("re:"):
subject = f"Re: {subject}"
smtp_conn, smtp_err = _open_smtp(cfg)
if smtp_err:
return json.dumps({"success": False, "error": smtp_err})
try:
msg = MIMEText(body, "plain", "utf-8")
msg["Subject"] = subject
msg["From"] = cfg["account"]
msg["To"] = original_from
msg["Date"] = formatdate(localtime=True)
msg["In-Reply-To"] = original_msg_id
msg["References"] = original_msg_id
msg["X-Mailer"] = "hermes-proton-mail-skill"
errors, message_id = smtp_conn.send_message(
msg, from_addr=cfg["account"], to_addrs=[original_from])
return json.dumps({
"success": True,
"message_id": message_id,
"in_reply_to": original_msg_id,
"to": original_from,
"subject": subject,
})
except smtplib.SMTPException as e:
return json.dumps({"success": False, "error": f"SMTP error: {e}"})
finally:
try:
smtp_conn.quit()
except Exception:
pass