Build the proton-drive Hermes skill following the Phase 4 spec
from ARCHITECTURE.md (§5). Primary path: rclone protondrive backend
with Drive SDK as a fallback option.
Skill components:
- skills/proton-drive/SKILL.md — YAML frontmatter + full docs for
all 9 tools (list, read, download, upload, search, mkdir,
delete, stat, sync) with usage, error handling, security notes
- skills/proton-drive/__init__.py — package init with exports
- skills/proton-drive/tools.py — Python subprocess wrappers for
each tool, plus rclone availability/remote checks
- tests/test_drive.py — 25 unit tests (all pass) with mocked
subprocess.run
All 9 Proton Drive tools implemented:
proton_drive_list, proton_drive_read, proton_drive_download,
proton_drive_upload, proton_drive_search, proton_drive_mkdir,
proton_drive_delete, proton_drive_stat, proton_drive_sync
Signed-off-by: Bee <bee@trentuna.com>
641 lines
19 KiB
Python
641 lines
19 KiB
Python
"""Proton Drive tool implementations — subprocess wrappers around rclone protondrive.
|
|
|
|
All tools shell out to the rclone binary with a configured protondrive remote.
|
|
rclone handles all encryption, chunking, and API semantics transparently.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
DEFAULT_REMOTE = "protondrive"
|
|
RCLONE_MAX_READ_SIZE = 10 * 1024 * 1024 # 10 MB
|
|
RCLONE_TIMEOUT_LIST = 30
|
|
RCLONE_TIMEOUT_IO = 60
|
|
RCLONE_TIMEOUT_SYNC = 300
|
|
|
|
|
|
def _get_rclone_path() -> str:
|
|
"""Return the rclone binary path, checking env override first."""
|
|
return os.environ.get("PROTON_RCLONE_PATH", "rclone")
|
|
|
|
|
|
def _get_remote() -> str:
|
|
"""Return the configured protondrive remote name."""
|
|
return os.environ.get("PROTON_RCLONE_REMOTE", DEFAULT_REMOTE)
|
|
|
|
|
|
def _remote_path(path: str) -> str:
|
|
"""Join remote name and path into a single rclone argument."""
|
|
remote = _get_remote()
|
|
# Strip leading slash for path joining; rclone expects "remote:path"
|
|
clean = path.lstrip("/")
|
|
return f"{remote}:{clean}"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _run_rclone(
|
|
args: list[str],
|
|
timeout: int = RCLONE_TIMEOUT_LIST,
|
|
stdin: Optional[bytes] = None,
|
|
) -> subprocess.CompletedProcess:
|
|
"""Run rclone with the given args and return the CompletedProcess."""
|
|
rclone = _get_rclone_path()
|
|
cmd = [rclone] + args
|
|
try:
|
|
return subprocess.run(
|
|
cmd,
|
|
input=stdin,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout,
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
return subprocess.CompletedProcess(
|
|
args=cmd,
|
|
returncode=-1,
|
|
stdout="",
|
|
stderr="rclone timed out after {timeout}s",
|
|
)
|
|
except FileNotFoundError:
|
|
return subprocess.CompletedProcess(
|
|
args=cmd,
|
|
returncode=-2,
|
|
stdout="",
|
|
stderr=f"rclone not found at '{rclone}'. Install: https://rclone.org/install",
|
|
)
|
|
|
|
|
|
def _parse_lsf_json(output: str) -> list[dict]:
|
|
"""Parse newline-delimited JSON output from `rclone lsf --json`."""
|
|
items = []
|
|
for line in output.strip().split("\n"):
|
|
line = line.strip()
|
|
if line:
|
|
try:
|
|
items.append(json.loads(line))
|
|
except json.JSONDecodeError:
|
|
items.append({"Name": line, "raw": True})
|
|
return items
|
|
|
|
|
|
def _parse_lsl_output(output: str) -> list[dict]:
|
|
"""Parse tabular output from `rclone lsl`."""
|
|
items = []
|
|
for line in output.strip().split("\n"):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
# rclone lsl format: <size> <modtime> <path>
|
|
parts = line.split(maxsplit=2)
|
|
if len(parts) >= 3:
|
|
items.append({
|
|
"Size": parts[0],
|
|
"ModTime": parts[1],
|
|
"Path": parts[2],
|
|
})
|
|
elif len(parts) >= 1:
|
|
items.append({"Size": parts[0]})
|
|
return items
|
|
|
|
|
|
def _check_rclone_available() -> str | None:
|
|
"""Check if rclone binary is reachable. Returns None if OK, error string if not."""
|
|
rclone = _get_rclone_path()
|
|
try:
|
|
result = subprocess.run(
|
|
[rclone, "version"],
|
|
capture_output=True, text=True, timeout=5,
|
|
)
|
|
if result.returncode != 0:
|
|
return f"rclone execution failed: {result.stderr.strip()}"
|
|
return None
|
|
except FileNotFoundError:
|
|
return f"rclone not found at '{rclone}'. Install from https://rclone.org/install"
|
|
except subprocess.TimeoutExpired:
|
|
return "rclone version check timed out"
|
|
|
|
|
|
def _check_remote_exists(remote: str) -> str | None:
|
|
"""Check if the named remote is configured. Returns None if OK, error string if not."""
|
|
result = _run_rclone(["listremotes"], timeout=5)
|
|
if f"{remote}:" in result.stdout:
|
|
return None
|
|
return (
|
|
f"rclone remote '{remote}' not configured. "
|
|
f"Run: rclone config create {remote} protondrive username=your@proton.me"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API / Tool Handlers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def check_rclone_availability() -> dict:
|
|
"""Verify rclone is installed and the protondrive remote is configured.
|
|
|
|
Returns:
|
|
dict with status summary and any errors found
|
|
"""
|
|
errors = []
|
|
rclone_err = _check_rclone_available()
|
|
if rclone_err:
|
|
return {"available": False, "errors": [rclone_err]}
|
|
|
|
remote = _get_remote()
|
|
remote_err = _check_remote_exists(remote)
|
|
if remote_err:
|
|
errors.append(remote_err)
|
|
|
|
return {
|
|
"available": len(errors) == 0,
|
|
"rclone": "ok",
|
|
"remote": remote,
|
|
"remote_configured": remote_err is None,
|
|
"errors": errors or None,
|
|
}
|
|
|
|
|
|
def check_rclone_remote() -> dict:
|
|
"""Check rclone remote configuration status. Legacy alias for environment checks."""
|
|
remote = _get_remote()
|
|
err = _check_remote_exists(remote)
|
|
return {
|
|
"remote": remote,
|
|
"configured": err is None,
|
|
"error": err,
|
|
}
|
|
|
|
|
|
def get_rclone_remote() -> str:
|
|
"""Return the currently configured rclone remote name."""
|
|
return _get_remote()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# proton_drive_list
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def proton_drive_list(
|
|
path: str = "/",
|
|
recursive: bool = False,
|
|
dirs_only: bool = False,
|
|
files_only: bool = False,
|
|
) -> dict:
|
|
"""List files and folders at the given path on Proton Drive.
|
|
|
|
Args:
|
|
path: Directory path on Proton Drive (default: root "/")
|
|
recursive: List all subdirectories recursively
|
|
dirs_only: Show directories only
|
|
files_only: Show files only
|
|
|
|
Returns:
|
|
dict with items array or error
|
|
"""
|
|
args = ["lsf", "--json"]
|
|
if recursive:
|
|
args.append("-R")
|
|
if dirs_only:
|
|
args.append("--dirs-only")
|
|
if files_only:
|
|
args.append("--files-only")
|
|
|
|
rp = _remote_path(path)
|
|
args.append(rp)
|
|
|
|
result = _run_rclone(args, timeout=RCLONE_TIMEOUT_LIST)
|
|
if result.returncode != 0:
|
|
return {"error": result.stderr.strip()}
|
|
|
|
items = _parse_lsf_json(result.stdout)
|
|
return {"items": items, "count": len(items), "path": path}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# proton_drive_read
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def proton_drive_read(
|
|
path: str,
|
|
head: Optional[int] = None,
|
|
tail: Optional[int] = None,
|
|
) -> dict:
|
|
"""Read the contents of a text file from Proton Drive.
|
|
|
|
For files >10MB, returns an error suggesting proton_drive_download instead.
|
|
|
|
Args:
|
|
path: Path to the file on Proton Drive (e.g. "Documents/notes.txt")
|
|
head: If set, read only the first N lines
|
|
tail: If set, read only the last N lines
|
|
|
|
Returns:
|
|
dict with content or error
|
|
"""
|
|
# Check file size first via stat
|
|
stat_result = proton_drive_stat(path)
|
|
if "error" in stat_result:
|
|
return stat_result
|
|
|
|
# stat_result may have "Size" (from lsl) or "size" (from lsf --json)
|
|
raw_size = stat_result.get("Size") or stat_result.get("size") or 0
|
|
try:
|
|
size_int = int(raw_size)
|
|
except (ValueError, TypeError):
|
|
size_int = 0
|
|
|
|
if size_int > RCLONE_MAX_READ_SIZE:
|
|
return {
|
|
"error": (
|
|
f"File is {size_int} bytes, exceeding {RCLONE_MAX_READ_SIZE} byte "
|
|
f"inline read limit. Use proton_drive_download instead."
|
|
),
|
|
"size_bytes": size_int,
|
|
}
|
|
|
|
rp = _remote_path(path)
|
|
result = _run_rclone(["cat", rp], timeout=RCLONE_TIMEOUT_IO)
|
|
if result.returncode != 0:
|
|
return {"error": result.stderr.strip()}
|
|
|
|
content = result.stdout
|
|
if head is not None:
|
|
lines = content.split("\n")
|
|
content = "\n".join(lines[:head])
|
|
elif tail is not None:
|
|
lines = content.split("\n")
|
|
content = "\n".join(lines[-tail:])
|
|
|
|
return {
|
|
"content": content,
|
|
"size_bytes": len(content.encode("utf-8")),
|
|
"path": path,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# proton_drive_download
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def proton_drive_download(
|
|
remote_path: str,
|
|
local_path: str,
|
|
progress: bool = False,
|
|
) -> dict:
|
|
"""Download a file from Proton Drive to the local filesystem.
|
|
|
|
Args:
|
|
remote_path: Source path on Proton Drive
|
|
local_path: Destination path on local filesystem (absolute or ~/expanded)
|
|
progress: Show rclone transfer progress output
|
|
|
|
Returns:
|
|
dict with status, local path, and size
|
|
"""
|
|
expanded_local = os.path.expanduser(local_path)
|
|
rp = _remote_path(remote_path)
|
|
|
|
# If local_path ends with /, treat as directory
|
|
if local_path.endswith("/") or local_path.endswith(os.sep):
|
|
os.makedirs(expanded_local, exist_ok=True)
|
|
dest = expanded_local
|
|
else:
|
|
# Ensure parent dir exists
|
|
parent = os.path.dirname(expanded_local)
|
|
if parent:
|
|
os.makedirs(parent, exist_ok=True)
|
|
dest = expanded_local
|
|
|
|
args = ["copy"]
|
|
if progress:
|
|
args.append("--progress")
|
|
args.extend([rp, dest])
|
|
|
|
result = _run_rclone(args, timeout=RCLONE_TIMEOUT_IO)
|
|
if result.returncode != 0:
|
|
return {"error": result.stderr.strip()}
|
|
|
|
# Determine the actual file that was written
|
|
if os.path.isdir(dest):
|
|
filename = os.path.basename(remote_path.rstrip("/"))
|
|
actual_path = os.path.join(dest, filename)
|
|
else:
|
|
actual_path = dest
|
|
|
|
size = 0
|
|
if os.path.isfile(actual_path):
|
|
size = os.path.getsize(actual_path)
|
|
|
|
return {
|
|
"status": "downloaded",
|
|
"local_path": actual_path,
|
|
"size_bytes": size,
|
|
"remote_path": remote_path,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# proton_drive_upload
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def proton_drive_upload(
|
|
local_path: str,
|
|
remote_path: str,
|
|
create_parents: bool = True,
|
|
progress: bool = False,
|
|
) -> dict:
|
|
"""Upload a file or directory from the local filesystem to Proton Drive.
|
|
|
|
Args:
|
|
local_path: Source path on local filesystem
|
|
remote_path: Destination path on Proton Drive
|
|
create_parents: Create parent dirs if they don't exist (default: True)
|
|
progress: Show rclone transfer progress output
|
|
|
|
Returns:
|
|
dict with status, remote path, and size
|
|
"""
|
|
expanded_local = os.path.expanduser(local_path)
|
|
|
|
if not os.path.exists(expanded_local):
|
|
return {"error": f"Local path '{expanded_local}' does not exist"}
|
|
|
|
if create_parents:
|
|
# Ensure the remote parent path exists by creating it
|
|
parent_remote = remote_path.rstrip("/")
|
|
# If remote_path looks like a file path, get its parent dir
|
|
if not remote_path.endswith("/"):
|
|
parent_dir = os.path.dirname(remote_path) or "/"
|
|
else:
|
|
parent_dir = remote_path
|
|
|
|
mkdir_rp = _remote_path(parent_dir)
|
|
_run_rclone(["mkdir", mkdir_rp], timeout=RCLONE_TIMEOUT_LIST)
|
|
|
|
rp = _remote_path(remote_path)
|
|
args = ["copy"]
|
|
if progress:
|
|
args.append("--progress")
|
|
args.extend([expanded_local, rp])
|
|
|
|
result = _run_rclone(args, timeout=RCLONE_TIMEOUT_IO)
|
|
if result.returncode != 0:
|
|
return {"error": result.stderr.strip()}
|
|
|
|
file_size = 0
|
|
if os.path.isfile(expanded_local):
|
|
file_size = os.path.getsize(expanded_local)
|
|
|
|
return {
|
|
"status": "uploaded",
|
|
"remote_path": remote_path,
|
|
"local_path": expanded_local,
|
|
"size_bytes": file_size,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# proton_drive_search
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def proton_drive_search(
|
|
query: str,
|
|
path: str = "/",
|
|
regex: bool = False,
|
|
max_results: int = 50,
|
|
) -> dict:
|
|
"""Search for files by name across Proton Drive.
|
|
|
|
Uses `rclone lsf -R --files-only --json` piped through a name filter.
|
|
|
|
Args:
|
|
query: Search term (substring) or regex pattern
|
|
path: Root path to search under (default: "/")
|
|
regex: If True, treat query as regex instead of substring match
|
|
max_results: Maximum number of results to return
|
|
|
|
Returns:
|
|
dict with matching items array
|
|
"""
|
|
rp = _remote_path(path)
|
|
result = _run_rclone(
|
|
["lsf", "-R", "--files-only", "--json", rp],
|
|
timeout=RCLONE_TIMEOUT_LIST,
|
|
)
|
|
if result.returncode != 0:
|
|
return {"error": result.stderr.strip()}
|
|
|
|
all_items = _parse_lsf_json(result.stdout)
|
|
|
|
if regex:
|
|
try:
|
|
pattern = re.compile(query, re.IGNORECASE)
|
|
except re.error as e:
|
|
return {"error": f"Invalid regex: {e}"}
|
|
matches = [it for it in all_items if pattern.search(it.get("Name", ""))]
|
|
else:
|
|
q = query.lower()
|
|
matches = [it for it in all_items if q in it.get("Name", "").lower()]
|
|
|
|
limited = matches[:max_results]
|
|
|
|
return {
|
|
"items": limited,
|
|
"count": len(limited),
|
|
"total_matches": len(matches),
|
|
"query": query,
|
|
"path": path,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# proton_drive_mkdir
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def proton_drive_mkdir(path: str) -> dict:
|
|
"""Create a folder on Proton Drive.
|
|
|
|
Creates all parent directories if they don't exist (like mkdir -p).
|
|
|
|
Args:
|
|
path: Path of folder to create (e.g. "Documents/NewFolder")
|
|
|
|
Returns:
|
|
dict with status
|
|
"""
|
|
rp = _remote_path(path)
|
|
result = _run_rclone(["mkdir", rp], timeout=RCLONE_TIMEOUT_LIST)
|
|
if result.returncode != 0:
|
|
return {"error": result.stderr.strip()}
|
|
return {"status": "created", "path": path}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# proton_drive_delete
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def proton_drive_delete(path: str, recursive: bool = False) -> dict:
|
|
"""Delete a file or folder from Proton Drive.
|
|
|
|
Non-recursive deletion of non-empty folders will fail.
|
|
|
|
Args:
|
|
path: Path to the file or folder to delete
|
|
recursive: Recursively delete folder contents
|
|
|
|
Returns:
|
|
dict with status
|
|
"""
|
|
rp = _remote_path(path)
|
|
|
|
if recursive:
|
|
# rclone purge removes a directory and all its contents
|
|
result = _run_rclone(["purge", rp], timeout=RCLONE_TIMEOUT_IO)
|
|
else:
|
|
# rclone delete removes files only (fails on non-empty dirs)
|
|
result = _run_rclone(["delete", rp], timeout=RCLONE_TIMEOUT_IO)
|
|
|
|
if result.returncode != 0:
|
|
return {"error": result.stderr.strip()}
|
|
return {"status": "deleted", "path": path, "recursive": recursive}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# proton_drive_stat
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def proton_drive_stat(path: str) -> dict:
|
|
"""Get detailed metadata for a file or folder.
|
|
|
|
Args:
|
|
path: Path to the file or folder on Proton Drive
|
|
|
|
Returns:
|
|
dict with metadata (Name, Path, Size, ModTime, IsDir, etc.)
|
|
"""
|
|
# Use lsl for size + modtime, then lsf --json for structured info
|
|
rp = _remote_path(path)
|
|
lsl_result = _run_rclone(["lsl", rp], timeout=RCLONE_TIMEOUT_LIST)
|
|
|
|
if lsl_result.returncode != 0:
|
|
# Try lsf instead (might be a directory)
|
|
lsf_result = _run_rclone(
|
|
["lsf", "--json", rp], timeout=RCLONE_TIMEOUT_LIST
|
|
)
|
|
if lsf_result.returncode != 0:
|
|
return {"error": lsf_result.stderr.strip()}
|
|
|
|
items = _parse_lsf_json(lsf_result.stdout)
|
|
if items:
|
|
return items[0]
|
|
return {"path": path, "IsDir": True}
|
|
|
|
parsed = _parse_lsl_output(lsl_result.stdout)
|
|
if parsed:
|
|
info = parsed[0]
|
|
info["path"] = path
|
|
info["IsDir"] = False
|
|
return info
|
|
|
|
return {"path": path, "error": "no metadata returned"}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# proton_drive_sync
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def proton_drive_sync(
|
|
source: str,
|
|
dest: str,
|
|
dry_run: bool = True,
|
|
delete_excluded: bool = False,
|
|
) -> dict:
|
|
"""Synchronize a local directory with Proton Drive.
|
|
|
|
WARNING: Defaults to dry_run=True. Must confirm before live sync,
|
|
especially with delete_excluded=True.
|
|
|
|
Args:
|
|
source: Source path (local:/path or remote:path)
|
|
dest: Destination path (local:/path or remote:path)
|
|
dry_run: If True, show what would change without applying (default: True)
|
|
delete_excluded: Delete files at dest not present at source
|
|
|
|
Returns:
|
|
dict with sync report
|
|
"""
|
|
args = ["sync"]
|
|
|
|
if dry_run:
|
|
args.append("--dry-run")
|
|
if delete_excluded:
|
|
args.append("--delete-excluded")
|
|
|
|
# Auto-expand ~ in local paths
|
|
src = source if ":" in source else os.path.expanduser(source)
|
|
dst = dest if ":" in dest else os.path.expanduser(dest)
|
|
|
|
args.extend([src, dst])
|
|
|
|
result = _run_rclone(args, timeout=RCLONE_TIMEOUT_SYNC)
|
|
if result.returncode != 0:
|
|
return {"error": result.stderr.strip()}
|
|
|
|
# Parse output for summary information
|
|
summary = _parse_sync_output(result.stdout, result.stderr)
|
|
|
|
return {
|
|
"status": "dry_run" if dry_run else "synced",
|
|
"source": source,
|
|
"dest": dest,
|
|
"dry_run": dry_run,
|
|
"delete_excluded": delete_excluded,
|
|
**summary,
|
|
}
|
|
|
|
|
|
def _parse_sync_output(stdout: str, stderr: str) -> dict:
|
|
"""Extract transfer summary from rclone sync output."""
|
|
changes = []
|
|
errors = []
|
|
|
|
for line in stdout.split("\n"):
|
|
line = line.strip()
|
|
if line:
|
|
changes.append(line)
|
|
|
|
# rclone reports errors on stderr
|
|
for line in stderr.split("\n"):
|
|
line = line.strip()
|
|
if line and ("ERROR" in line.upper() or "failed" in line.lower()):
|
|
errors.append(line)
|
|
|
|
return {
|
|
"changes_log": changes,
|
|
"errors": errors or None,
|
|
"change_count": len(changes),
|
|
"error_count": len(errors),
|
|
}
|