#!/usr/bin/env python3 """ Proton VPN Hermes skill — tool implementations. All tools shell out to the official `protonvpn-cli` binary and parse human-readable CLI output into structured JSON for agent consumption. Requires: protonvpn-cli >= 1.0.0 (installed via Proton repos) See SKILL.md for installation and setup instructions. """ import json import os import re import shlex import subprocess from datetime import datetime from typing import Any # ── helpers ──────────────────────────────────────────────────────────── def _run_vpn(args: list[str], timeout: int = 30) -> dict: """Run a `protonvpn-cli` command and return a structured result dict.""" cmd = ["protonvpn-cli"] + args try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout ) out = result.stdout.strip() err = result.stderr.strip() if result.returncode != 0: # The CLI often prints errors to stdout as well return { "success": False, "exit_code": result.returncode, "error": err or out, "command": " ".join(shlex.quote(s) for s in cmd), } return { "success": True, "output": out, "command": " ".join(shlex.quote(s) for s in cmd), } except subprocess.TimeoutExpired: return { "success": False, "error": f"Command timed out after {timeout}s: {' '.join(shlex.quote(s) for s in cmd)}", } except FileNotFoundError: return { "success": False, "error": ( "protonvpn-cli not found. Install via Proton repos:\n" " https://protonvpn.com/support/linux-cli" ), } except OSError as exc: return { "success": False, "error": str(exc), } def _check_vpn_privileges() -> dict | None: """ Check that the current user can use protonvpn-cli. Returns an error dict if the user lacks privileges, or None if OK. """ if os.geteuid() == 0: return None # root can always run VPN commands try: groups_out = subprocess.run( ["groups"], capture_output=True, text=True, timeout=5 ).stdout.strip() except Exception as exc: return {"success": False, "error": f"Cannot check groups: {exc}"} if "protonvpn" not in groups_out: return { "success": False, "error": ( "User is not in the 'protonvpn' group. " "Run: sudo usermod -aG protonvpn $USER && log out and back in." ), } return None def _privilege_guard() -> dict | None: """Run privilege check and return a structured error if it fails.""" err = _check_vpn_privileges() if err is not None: return err return None # ── output parsers ───────────────────────────────────────────────────── def _parse_status(raw: str) -> dict: """ Parse `protonvpn-cli status` output. Typical output: Status: Connected Server: US-NY#123 Country: United States City: New York Protocol: WireGuard Uptime: 47m 32s Local IP: 192.168.1.42 Public IP: 10.2.3.4 """ result = {"connected": False} for line in raw.splitlines(): line = line.strip() if not line or ":" not in line: continue key, _, value = line.partition(":") key = key.strip().lower().replace(" ", "_") value = value.strip() if key == "status": result["connected"] = value.lower() == "connected" result["status"] = value elif key == "server": result["server_name"] = value elif key == "country": result["country"] = value elif key == "city": result["city"] = value elif key == "protocol": result["protocol"] = value elif key == "uptime": result["uptime"] = value elif key == "local_ip": result["local_ip"] = value elif key == "public_ip": result["public_ip"] = value return result def _parse_servers(raw: str) -> list[dict]: """ Parse `protonvpn-cli servers` output (table format). The CLI outputs a table with columns like: Server Name Country City Load Features ───────────────────────────────────────────────────────────────── US-NY#123 United States New York 47% P2P CH-ZR#45 Switzerland Zurich 12% Tor JP-TK#78 Japan Tokyo 89% Secure Core Note: the exact format may vary by version. This parser is heuristic. """ lines = raw.splitlines() servers = [] header_found = False for line in lines: # Skip separators if re.match(r"^[─\-\s]+$", line): continue # Skip empty lines and banner if not line.strip() or line.strip().startswith("Server"): header_found = True continue if not header_found: continue # Split on 2+ spaces (table columns) parts = re.split(r"\s{2,}", line.strip()) if len(parts) < 3: continue entry = { "name": parts[0].strip(), "country": parts[1].strip() if len(parts) > 1 else "", "city": parts[2].strip() if len(parts) > 2 else "", } if len(parts) > 3: load_str = parts[3].strip().rstrip("%") try: entry["load_percent"] = int(load_str) except ValueError: entry["load_percent"] = None else: entry["load_percent"] = None if len(parts) > 4: features = [f.strip().lower() for f in parts[4].split(",") if f.strip()] entry["features"] = features else: entry["features"] = [] servers.append(entry) return servers def _parse_config(raw: str) -> dict: """ Parse `protonvpn-cli config --list` or `protonvpn-cli settings` output. Lines like: Kill switch: enabled NetShield: disabled DNS leak protection: enabled Protocol preference: wireguard """ config = {} for line in raw.splitlines(): if ":" not in line: continue key, _, value = line.partition(":") config[key.strip().lower().replace(" ", "_")] = value.strip() return config # ── tool handlers ────────────────────────────────────────────────────── def proton_vpn_login(args: dict) -> dict: """ Authenticate with Proton VPN. NOTE: This opens a browser for OAuth and requires a desktop session. It will not work in headless environments. """ username = args.get("username", "") cmd_args = ["login"] if username: cmd_args.append(username) result = _run_vpn(cmd_args, timeout=60) # login can be slow (browser wait) if not result["success"]: return result return { "success": True, "message": "Login initiated. Follow the browser prompt to authenticate.", "output": result["output"], "note": ( "Proton VPN CLI uses browser-based OAuth. " "This requires a desktop session with a browser available." ), } def proton_vpn_logout(args: dict) -> dict: """Log out from Proton VPN, clearing stored credentials.""" priv = _privilege_guard() if priv: return priv result = _run_vpn(["logout"]) if not result["success"]: return result return { "success": True, "message": "Logged out of Proton VPN. Credentials cleared.", } def proton_vpn_connect(args: dict) -> dict: """ Connect to a Proton VPN server. Supports: fastest (default), random, country, city, P2P, Tor, Secure Core. """ priv = _privilege_guard() if priv: return priv cmd_args = ["connect"] selection = args.get("selection", "fastest") # Server name takes precedence server_name = args.get("server") if server_name: cmd_args.append(server_name) elif selection == "fastest": cmd_args.append("--fastest") elif selection == "random": cmd_args.append("--random") elif selection == "country": country = args.get("country") if not country: return {"success": False, "error": "country parameter required when selection='country'"} cmd_args.append(f"--country={country}") elif selection == "city": city = args.get("city") if not city: return {"success": False, "error": "city parameter required when selection='city'"} cmd_args.append(f"--city={city}") elif selection == "p2p": cmd_args.append("--p2p") elif selection == "tor": cmd_args.append("--tor") elif selection == "free": cmd_args.append("--free") elif selection == "secure-core": cmd_args.append("--secure-core") # Optional protocol protocol = args.get("protocol") if protocol: cmd_args.extend(["--protocol", protocol]) # Optional persistent flag if args.get("persistent"): cmd_args.append("--persistent") result = _run_vpn(cmd_args, timeout=45) if not result["success"]: return result return { "success": True, "action": "connect", "selection": selection, "message": result["output"], } def proton_vpn_disconnect(args: dict) -> dict: """Disconnect the current VPN session.""" priv = _privilege_guard() if priv: return priv result = _run_vpn(["disconnect"]) if not result["success"]: return result return { "success": True, "action": "disconnect", "message": "VPN disconnected.", "output": result["output"], } def proton_vpn_status(args: dict) -> dict: """Get current VPN connection status.""" result = _run_vpn(["status"]) if not result["success"]: return result parsed = _parse_status(result["output"]) return { "success": True, **parsed, } def proton_vpn_servers(args: dict) -> dict: """List available Proton VPN servers.""" result = _run_vpn(["servers"], timeout=60) if not result["success"]: return result servers = _parse_servers(result["output"]) # Apply filters country_filter = args.get("country") if country_filter: country_upper = country_filter.upper() servers = [s for s in servers if s.get("country", "").upper() == country_upper] features_filter = args.get("features") if features_filter: if isinstance(features_filter, str): features_filter = [features_filter] features_lower = [f.lower() for f in features_filter] servers = [ s for s in servers if any(f in [sf.lower() for sf in s.get("features", [])] for f in features_lower) ] fmt = args.get("format", "json") return { "success": True, "total": len(servers), "servers": servers, "format": fmt, } def proton_vpn_killswitch(args: dict) -> dict: """Enable or disable the VPN kill switch.""" priv = _privilege_guard() if priv: return priv state = args.get("state", "").lower() if state not in ("on", "off"): return { "success": False, "error": "state must be 'on' or 'off'", } result = _run_vpn(["settings", f"--killswitch={state}"]) if not result["success"]: return result return { "success": True, "killswitch": state, "message": f"Kill switch {state == 'on' and 'enabled' or 'disabled'}.", "output": result["output"], } def proton_vpn_config(args: dict) -> dict: """View or modify VPN configuration.""" priv = _privilege_guard() if priv: return priv action = args.get("action", "view") if action == "view": result = _run_vpn(["config", "--list"]) if not result["success"]: return result parsed = _parse_config(result["output"]) return { "success": True, "action": "view", "config": parsed, } value = args.get("value", "") if action == "set-dns": result = _run_vpn(["settings", f"--custom-dns={value}"]) elif action == "set-netshield": if value not in ("on", "off", "strict"): return {"success": False, "error": "NetShield value must be 'on', 'off', or 'strict'"} result = _run_vpn(["settings", f"--netshield={value}"]) elif action == "set-protocol": if value not in ("wireguard", "openvpn_udp", "openvpn_tcp"): return {"success": False, "error": "Protocol must be 'wireguard', 'openvpn_udp', or 'openvpn_tcp'"} result = _run_vpn(["settings", f"--protocol={value}"]) else: return {"success": False, "error": f"Unknown action: {action}"} if not result["success"]: return result return { "success": True, "action": action, "value": value, "message": f"Configuration updated: {action} = {value}", } def proton_vpn_refresh(args: dict) -> dict: """Refresh server list and VPN config from Proton's API.""" priv = _privilege_guard() if priv: return priv result = _run_vpn(["refresh"], timeout=60) if not result["success"]: return result return { "success": True, "message": "Server list and configuration refreshed.", "output": result["output"], } # ── tool registry ────────────────────────────────────────────────────── TOOLS: dict[str, dict[str, Any]] = { "proton_vpn_login": { "handler": proton_vpn_login, "schema": { "name": "proton_vpn_login", "description": "Authenticate with Proton VPN. Requires a desktop session for browser OAuth.", "parameters": { "type": "object", "properties": { "username": { "type": "string", "description": "Proton account username/email (optional)." } } } } }, "proton_vpn_logout": { "handler": proton_vpn_logout, "schema": { "name": "proton_vpn_logout", "description": "Log out from Proton VPN. Clears stored credentials and disconnects.", "parameters": {"type": "object", "properties": {}} } }, "proton_vpn_connect": { "handler": proton_vpn_connect, "schema": { "name": "proton_vpn_connect", "description": "Connect to a Proton VPN server. Supports fastest, random, country, city, P2P, Tor, Secure Core, and free server selection.", "parameters": { "type": "object", "properties": { "server": { "type": "string", "description": "Server name or ID (e.g., 'US-NY#1')." }, "selection": { "type": "string", "enum": ["fastest", "random", "country", "city", "p2p", "tor", "free", "secure-core"], "description": "Server selection strategy. Default: fastest." }, "country": { "type": "string", "description": "Two-letter country code (e.g., 'US'). Used with selection='country'." }, "city": { "type": "string", "description": "City name (e.g., 'New York'). Used with selection='city'." }, "protocol": { "type": "string", "enum": ["wireguard", "openvpn_udp", "openvpn_tcp"], "description": "VPN protocol." }, "persistent": { "type": "boolean", "description": "Auto-reconnect on disconnect." } } } } }, "proton_vpn_disconnect": { "handler": proton_vpn_disconnect, "schema": { "name": "proton_vpn_disconnect", "description": "Disconnect the current VPN session and restore normal network connectivity.", "parameters": {"type": "object", "properties": {}} } }, "proton_vpn_status": { "handler": proton_vpn_status, "schema": { "name": "proton_vpn_status", "description": "Check current VPN connection status — server, protocol, uptime, IP information.", "parameters": {"type": "object", "properties": {}} } }, "proton_vpn_servers": { "handler": proton_vpn_servers, "schema": { "name": "proton_vpn_servers", "description": "List available Proton VPN servers with country, city, load, and features.", "parameters": { "type": "object", "properties": { "country": { "type": "string", "description": "Filter by two-letter country code (e.g., 'US')." }, "features": { "type": "array", "items": { "type": "string", "enum": ["p2p", "tor", "secure-core", "free"] }, "description": "Filter by required features." }, "format": { "type": "string", "enum": ["table", "json"], "description": "Output format. 'json' (default) returns structured data." } } } } }, "proton_vpn_killswitch": { "handler": proton_vpn_killswitch, "schema": { "name": "proton_vpn_killswitch", "description": "Enable or disable the VPN kill switch. When enabled, all internet traffic is blocked outside the VPN tunnel.", "parameters": { "type": "object", "properties": { "state": { "type": "string", "enum": ["on", "off"], "description": "Kill switch state." } }, "required": ["state"] } } }, "proton_vpn_config": { "handler": proton_vpn_config, "schema": { "name": "proton_vpn_config", "description": "View current Proton VPN configuration or modify DNS, NetShield, and protocol settings.", "parameters": { "type": "object", "properties": { "action": { "type": "string", "enum": ["view", "set-dns", "set-netshield", "set-protocol"], "description": "'view' (default): show config. 'set-*': modify setting." }, "value": { "type": "string", "description": "Setting value. For DNS: comma-separated IPs. For NetShield: 'on'/'off'/'strict'. For protocol: 'wireguard'/'openvpn_udp'/'openvpn_tcp'." } } } } }, "proton_vpn_refresh": { "handler": proton_vpn_refresh, "schema": { "name": "proton_vpn_refresh", "description": "Refresh server list and VPN configuration from Proton's API.", "parameters": {"type": "object", "properties": {}} } }, } def dispatch(tool_name: str, args: dict | None = None) -> str: """ Dispatch a tool by name with the given arguments. Returns a JSON string (Hermes tools return strings, not dicts). """ if args is None: args = {} tool = TOOLS.get(tool_name) if tool is None: return json.dumps({ "success": False, "error": f"Unknown tool: {tool_name}. Available: {', '.join(sorted(TOOLS.keys()))}", }) try: result = tool["handler"](args) return json.dumps(result, indent=2, default=str) except Exception as exc: return json.dumps({ "success": False, "error": f"{tool_name} raised: {exc}", }, indent=2) # ── CLI entry point (for testing) ────────────────────────────────────── def main(): """CLI entry point for testing tools from the command line.""" import sys if len(sys.argv) < 2: print(f"Usage: {sys.argv[0]} [json_args]") print(f"Tools: {', '.join(sorted(TOOLS.keys()))}") sys.exit(1) tool_name = sys.argv[1] args = {} if len(sys.argv) > 2: try: args = json.loads(sys.argv[2]) except json.JSONDecodeError as exc: print(json.dumps({"error": f"Invalid JSON args: {exc}"})) sys.exit(1) result = dispatch(tool_name, args) print(result) if __name__ == "__main__": main()