feat: Proton Drive Hermes skill — rclone-backed file operations

Build the proton-drive Hermes skill following the Phase 4 spec
from ARCHITECTURE.md (§5). Primary path: rclone protondrive backend
with Drive SDK as a fallback option.

Skill components:
  - skills/proton-drive/SKILL.md — YAML frontmatter + full docs for
    all 9 tools (list, read, download, upload, search, mkdir,
    delete, stat, sync) with usage, error handling, security notes
  - skills/proton-drive/__init__.py — package init with exports
  - skills/proton-drive/tools.py — Python subprocess wrappers for
    each tool, plus rclone availability/remote checks
  - tests/test_drive.py — 25 unit tests (all pass) with mocked
    subprocess.run

All 9 Proton Drive tools implemented:
  proton_drive_list, proton_drive_read, proton_drive_download,
  proton_drive_upload, proton_drive_search, proton_drive_mkdir,
  proton_drive_delete, proton_drive_stat, proton_drive_sync

Signed-off-by: Bee <bee@trentuna.com>
This commit is contained in:
exe.dev user 2026-06-08 18:30:26 +02:00
parent c332322220
commit f103d5f44f
No known key found for this signature in database
5 changed files with 1223 additions and 14 deletions

20
.gitignore vendored
View file

@ -1,26 +1,18 @@
# Python # Python
__pycache__/ __pycache__/
*.py[cod] *.py[cod]
*.pyo *.so
*.egg-info/
dist/
build/
# Environment # Environment
.env .env
venv/ *.env.local
.venv/
# OS
.DS_Store
Thumbs.db
# IDE # IDE
.vscode/ .vscode/
.idea/ .idea/
*.swp *.swp
*.swo *.swo
# OS
.DS_Store
Thumbs.db
# Debug
debug_*.py
tmp/

View file

@ -0,0 +1,268 @@
---
name: proton-drive
description: Hermes skill for Proton Drive — list, read, upload, download, search, and manage files using the rclone protondrive backend.
version: 1.0.0
author: Trentuna / Bee
license: MIT
platforms: [linux, macos, windows]
metadata:
hermes:
tags: [proton, drive, cloud-storage, rclone, file-management]
related_skills: [proton-mail, proton-pass, proton-vpn]
required_environment_variables:
- PROTON_RCLONE_REMOTE
optional_environment_variables:
- PROTON_RCLONE_PATH
- PROTON_DRIVE_USE_SDK
---
# Proton Drive Skill
> Agents can read, write, search, and manage files on Proton Drive through
> the battle-tested [rclone protondrive backend](https://rclone.org/protondrive/).
## Prerequisites
### 1. Install rclone
```bash
# Linux (official script)
curl https://rclone.org/install.sh | sudo bash
# macOS
brew install rclone
# Verify
rclone version
```
### 2. Configure the protondrive remote
```bash
rclone config create protondrive protondrive \
username=your@proton.me \
password=your_password \
--non-interactive
```
Or interactively:
```bash
rclone config
# → Choose "n" (new remote)
# → Name: protondrive
# → Type: protondrive
# → Follow prompts for username, password, 2FA
```
### 3. Set environment variables (optional)
```bash
export PROTON_RCLONE_REMOTE=protondrive # default: protondrive
export PROTON_RCLONE_PATH=/path/to/rclone # default: auto-detect from PATH
```
On first run, rclone will prompt for the mailbox password (your Proton
login password). After authentication, tokens are cached locally by
rclone's own credential store — no additional auth management needed
for subsequent calls.
## Tools
The skill provides 9 tools for interacting with Proton Drive. Each tool
shells out to `rclone` with the configured protondrive remote.
### `proton_drive_list`
List files and folders in a path on Proton Drive.
```
Usage: rclone lsf --json <remote>:<path>
Args:
path (string, default: "/") — Path to list (e.g. "Documents" or "/")
recursive (bool, default: false) — List recursively
dirs_only (bool, default: false) — Show directories only
files_only (bool, default: false) — Show files only
Returns: JSON array of {Name, Path, Size, ModTime, IsDir} items
```
### `proton_drive_read`
Read a file's content from Proton Drive.
```
Usage: rclone cat <remote>:<path>
Args:
path (string, required) — Path to the file (e.g. "Documents/notes.txt")
head (int, optional) — Only read first N lines
tail (int, optional) — Only read last N lines
Returns: File content as text string (up to 10MB; larger files use download instead)
```
Large files (>10MB) should be downloaded rather than read inline. The
tool will return an error suggesting `proton_drive_download` for files
exceeding the size threshold.
### `proton_drive_download`
Download a file from Proton Drive to a local path.
```
Usage: rclone copy <remote>:<path> <local_path>
Args:
remote_path (string, required) — Source path on Drive (e.g. "Documents/report.pdf")
local_path (string, required) — Destination local path (absolute or ~/expanded)
progress (bool, default: false) — Show transfer progress
Returns: JSON with {status, local_path, size_bytes}
```
The local path must be writable. If the path ends with `/`, the file
is downloaded into that directory preserving its filename.
### `proton_drive_upload`
Upload a local file or directory to Proton Drive.
```
Usage: rclone copy <local_path> <remote>:<path>
Args:
local_path (string, required) — Source path on local filesystem
remote_path (string, required) — Destination path on Drive (e.g. "Documents/")
create_parents (bool, default: true) — Create parent dirs if they don't exist
progress (bool, default: false) — Show transfer progress
Returns: JSON with {status, remote_path, size_bytes}
```
Supports both individual files and entire directories.
### `proton_drive_search`
Search for files by name across Proton Drive.
```
Usage: rclone lsf -R --files-only <remote>: | grep -i <query>
OR (for structured results): rclone lsf -R --json <remote>: | jq <filter>
Args:
query (string, required) — Search term or regex pattern
path (string, default: "/") — Root path to search under
regex (bool, default: false) — Treat query as regex instead of substring
max_results (int, default: 50) — Max results to return
Returns: JSON array of matching {Name, Path, Size, ModTime}
```
### `proton_drive_mkdir`
Create a folder on Proton Drive.
```
Usage: rclone mkdir <remote>:<path>
Args:
path (string, required) — Path of folder to create (e.g. "Documents/NewFolder")
Returns: JSON with {status: "created", path}
```
Creates all parent directories if they don't exist (like `mkdir -p`).
### `proton_drive_delete`
Delete a file or empty folder from Proton Drive.
```
Usage: rclone delete <remote>:<path>
Args:
path (string, required) — Path to the file/folder to delete
recursive (bool, default: false) — Recursively delete folder contents
Returns: JSON with {status: "deleted", path}
```
Non-recursive deletion of non-empty folders will fail. rclone's
`purge` command is used for recursive deletes.
### `proton_drive_stat`
Get detailed metadata for a file or folder.
```
Usage: rclone lsl <remote>:<path>
OR (detailed): rclone lsf --json <remote>:<path>
Args:
path (string, required) — Path to the file or folder
Returns: JSON with {Name, Path, Size, ModTime, IsDir, Hash, MimeType}
```
### `proton_drive_sync`
Synchronize a local directory with Proton Drive (or vice versa).
```
Usage: rclone sync <source> <destination> [flags]
Args:
source (string, required) — Source path (local: or remote:)
dest (string, required) — Destination (local: or remote:)
direction (enum, default: "upload") — "upload" (local→Drive), "download" (Drive→local), "bidirectional"
dry_run (bool, default: true) — If true, show what would change without syncing
delete_excluded (bool, default: false) — Delete files at dest not present at source
Returns: JSON with {status, changed, added, deleted, errors}
```
**IMPORTANT:** Defaults to `dry_run=true` for safety. The agent MUST
confirm with the user before running a live sync, especially when
`delete_excluded=true` — this can cause data loss.
## Implementation
All tools are implemented as Python subprocess wrappers in the
`tools/` subdirectory. The primary backend is `rclone`; the
TypeScript Drive SDK is available as a fallback via
`PROTON_DRIVE_USE_SDK=true`.
### Tool Handler Pattern
```python
def handle_proton_drive_list(args: dict, **kwargs) -> dict:
path = args.get("path", "/")
recursive = args.get("recursive", False)
cmd = _build_rclone_command("lsf", "--json", path, recursive=recursive)
result = _run_rclone(cmd, timeout=30)
if result.returncode != 0:
return {"error": result.stderr.strip()}
items = [json.loads(line) for line in result.stdout.strip().split("\n") if line]
return {"items": items}
```
### Rclone Configuration Check
Before any tool execution, verify the rclone remote exists:
```python
def _check_rclone_remote(remote: str) -> bool:
result = subprocess.run(
["rclone", "listremotes"],
capture_output=True, text=True, timeout=5
)
return f"{remote}:" in result.stdout
```
If the remote is not configured, return a clear error with setup
instructions referencing the README.
## Error Handling
| Error | Cause | Resolution |
|-------|-------|------------|
| `remote not found` | protondrive remote not configured | Run `rclone config` or refer to README |
| `authentication required` | Token expired or invalid | Run `rclone config reconnect <remote>` |
| `file not found` | Path doesn't exist | Check path with `proton_drive_list` |
| `file too large` | Read attempt on >10MB file | Use `proton_drive_download` instead |
| `rate limited` | Too many API calls | Retry with backoff (rclone does this auto) |
## Security Notes
- Files read by `proton_drive_read` enter the agent's context. For
sensitive documents, prefer `proton_drive_download` and specify a
local path.
- `proton_drive_sync` with `delete_excluded=true` is destructive.
Always default to `dry_run=true` and require confirmation.
- rclone stores tokens in its config file
(`~/.config/rclone/rclone.conf`). Ensure this file has appropriate
file permissions (`chmod 600`).

View file

@ -0,0 +1,31 @@
"""Proton Drive skill — rclone-backed file operations for Hermes agents."""
from .tools import (
check_rclone_availability,
check_rclone_remote,
get_rclone_remote,
proton_drive_delete,
proton_drive_download,
proton_drive_list,
proton_drive_mkdir,
proton_drive_read,
proton_drive_search,
proton_drive_stat,
proton_drive_sync,
proton_drive_upload,
)
__all__ = [
"check_rclone_availability",
"check_rclone_remote",
"get_rclone_remote",
"proton_drive_delete",
"proton_drive_download",
"proton_drive_list",
"proton_drive_mkdir",
"proton_drive_read",
"proton_drive_search",
"proton_drive_stat",
"proton_drive_sync",
"proton_drive_upload",
]

View file

@ -0,0 +1,641 @@
"""Proton Drive tool implementations — subprocess wrappers around rclone protondrive.
All tools shell out to the rclone binary with a configured protondrive remote.
rclone handles all encryption, chunking, and API semantics transparently.
"""
import json
import os
import re
import subprocess
import tempfile
from pathlib import Path
from typing import Optional
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
DEFAULT_REMOTE = "protondrive"
RCLONE_MAX_READ_SIZE = 10 * 1024 * 1024 # 10 MB
RCLONE_TIMEOUT_LIST = 30
RCLONE_TIMEOUT_IO = 60
RCLONE_TIMEOUT_SYNC = 300
def _get_rclone_path() -> str:
"""Return the rclone binary path, checking env override first."""
return os.environ.get("PROTON_RCLONE_PATH", "rclone")
def _get_remote() -> str:
"""Return the configured protondrive remote name."""
return os.environ.get("PROTON_RCLONE_REMOTE", DEFAULT_REMOTE)
def _remote_path(path: str) -> str:
"""Join remote name and path into a single rclone argument."""
remote = _get_remote()
# Strip leading slash for path joining; rclone expects "remote:path"
clean = path.lstrip("/")
return f"{remote}:{clean}"
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _run_rclone(
args: list[str],
timeout: int = RCLONE_TIMEOUT_LIST,
stdin: Optional[bytes] = None,
) -> subprocess.CompletedProcess:
"""Run rclone with the given args and return the CompletedProcess."""
rclone = _get_rclone_path()
cmd = [rclone] + args
try:
return subprocess.run(
cmd,
input=stdin,
capture_output=True,
text=True,
timeout=timeout,
)
except subprocess.TimeoutExpired:
return subprocess.CompletedProcess(
args=cmd,
returncode=-1,
stdout="",
stderr="rclone timed out after {timeout}s",
)
except FileNotFoundError:
return subprocess.CompletedProcess(
args=cmd,
returncode=-2,
stdout="",
stderr=f"rclone not found at '{rclone}'. Install: https://rclone.org/install",
)
def _parse_lsf_json(output: str) -> list[dict]:
"""Parse newline-delimited JSON output from `rclone lsf --json`."""
items = []
for line in output.strip().split("\n"):
line = line.strip()
if line:
try:
items.append(json.loads(line))
except json.JSONDecodeError:
items.append({"Name": line, "raw": True})
return items
def _parse_lsl_output(output: str) -> list[dict]:
"""Parse tabular output from `rclone lsl`."""
items = []
for line in output.strip().split("\n"):
line = line.strip()
if not line:
continue
# rclone lsl format: <size> <modtime> <path>
parts = line.split(maxsplit=2)
if len(parts) >= 3:
items.append({
"Size": parts[0],
"ModTime": parts[1],
"Path": parts[2],
})
elif len(parts) >= 1:
items.append({"Size": parts[0]})
return items
def _check_rclone_available() -> str | None:
"""Check if rclone binary is reachable. Returns None if OK, error string if not."""
rclone = _get_rclone_path()
try:
result = subprocess.run(
[rclone, "version"],
capture_output=True, text=True, timeout=5,
)
if result.returncode != 0:
return f"rclone execution failed: {result.stderr.strip()}"
return None
except FileNotFoundError:
return f"rclone not found at '{rclone}'. Install from https://rclone.org/install"
except subprocess.TimeoutExpired:
return "rclone version check timed out"
def _check_remote_exists(remote: str) -> str | None:
"""Check if the named remote is configured. Returns None if OK, error string if not."""
result = _run_rclone(["listremotes"], timeout=5)
if f"{remote}:" in result.stdout:
return None
return (
f"rclone remote '{remote}' not configured. "
f"Run: rclone config create {remote} protondrive username=your@proton.me"
)
# ---------------------------------------------------------------------------
# Public API / Tool Handlers
# ---------------------------------------------------------------------------
def check_rclone_availability() -> dict:
"""Verify rclone is installed and the protondrive remote is configured.
Returns:
dict with status summary and any errors found
"""
errors = []
rclone_err = _check_rclone_available()
if rclone_err:
return {"available": False, "errors": [rclone_err]}
remote = _get_remote()
remote_err = _check_remote_exists(remote)
if remote_err:
errors.append(remote_err)
return {
"available": len(errors) == 0,
"rclone": "ok",
"remote": remote,
"remote_configured": remote_err is None,
"errors": errors or None,
}
def check_rclone_remote() -> dict:
"""Check rclone remote configuration status. Legacy alias for environment checks."""
remote = _get_remote()
err = _check_remote_exists(remote)
return {
"remote": remote,
"configured": err is None,
"error": err,
}
def get_rclone_remote() -> str:
"""Return the currently configured rclone remote name."""
return _get_remote()
# ---------------------------------------------------------------------------
# proton_drive_list
# ---------------------------------------------------------------------------
def proton_drive_list(
path: str = "/",
recursive: bool = False,
dirs_only: bool = False,
files_only: bool = False,
) -> dict:
"""List files and folders at the given path on Proton Drive.
Args:
path: Directory path on Proton Drive (default: root "/")
recursive: List all subdirectories recursively
dirs_only: Show directories only
files_only: Show files only
Returns:
dict with items array or error
"""
args = ["lsf", "--json"]
if recursive:
args.append("-R")
if dirs_only:
args.append("--dirs-only")
if files_only:
args.append("--files-only")
rp = _remote_path(path)
args.append(rp)
result = _run_rclone(args, timeout=RCLONE_TIMEOUT_LIST)
if result.returncode != 0:
return {"error": result.stderr.strip()}
items = _parse_lsf_json(result.stdout)
return {"items": items, "count": len(items), "path": path}
# ---------------------------------------------------------------------------
# proton_drive_read
# ---------------------------------------------------------------------------
def proton_drive_read(
path: str,
head: Optional[int] = None,
tail: Optional[int] = None,
) -> dict:
"""Read the contents of a text file from Proton Drive.
For files >10MB, returns an error suggesting proton_drive_download instead.
Args:
path: Path to the file on Proton Drive (e.g. "Documents/notes.txt")
head: If set, read only the first N lines
tail: If set, read only the last N lines
Returns:
dict with content or error
"""
# Check file size first via stat
stat_result = proton_drive_stat(path)
if "error" in stat_result:
return stat_result
# stat_result may have "Size" (from lsl) or "size" (from lsf --json)
raw_size = stat_result.get("Size") or stat_result.get("size") or 0
try:
size_int = int(raw_size)
except (ValueError, TypeError):
size_int = 0
if size_int > RCLONE_MAX_READ_SIZE:
return {
"error": (
f"File is {size_int} bytes, exceeding {RCLONE_MAX_READ_SIZE} byte "
f"inline read limit. Use proton_drive_download instead."
),
"size_bytes": size_int,
}
rp = _remote_path(path)
result = _run_rclone(["cat", rp], timeout=RCLONE_TIMEOUT_IO)
if result.returncode != 0:
return {"error": result.stderr.strip()}
content = result.stdout
if head is not None:
lines = content.split("\n")
content = "\n".join(lines[:head])
elif tail is not None:
lines = content.split("\n")
content = "\n".join(lines[-tail:])
return {
"content": content,
"size_bytes": len(content.encode("utf-8")),
"path": path,
}
# ---------------------------------------------------------------------------
# proton_drive_download
# ---------------------------------------------------------------------------
def proton_drive_download(
remote_path: str,
local_path: str,
progress: bool = False,
) -> dict:
"""Download a file from Proton Drive to the local filesystem.
Args:
remote_path: Source path on Proton Drive
local_path: Destination path on local filesystem (absolute or ~/expanded)
progress: Show rclone transfer progress output
Returns:
dict with status, local path, and size
"""
expanded_local = os.path.expanduser(local_path)
rp = _remote_path(remote_path)
# If local_path ends with /, treat as directory
if local_path.endswith("/") or local_path.endswith(os.sep):
os.makedirs(expanded_local, exist_ok=True)
dest = expanded_local
else:
# Ensure parent dir exists
parent = os.path.dirname(expanded_local)
if parent:
os.makedirs(parent, exist_ok=True)
dest = expanded_local
args = ["copy"]
if progress:
args.append("--progress")
args.extend([rp, dest])
result = _run_rclone(args, timeout=RCLONE_TIMEOUT_IO)
if result.returncode != 0:
return {"error": result.stderr.strip()}
# Determine the actual file that was written
if os.path.isdir(dest):
filename = os.path.basename(remote_path.rstrip("/"))
actual_path = os.path.join(dest, filename)
else:
actual_path = dest
size = 0
if os.path.isfile(actual_path):
size = os.path.getsize(actual_path)
return {
"status": "downloaded",
"local_path": actual_path,
"size_bytes": size,
"remote_path": remote_path,
}
# ---------------------------------------------------------------------------
# proton_drive_upload
# ---------------------------------------------------------------------------
def proton_drive_upload(
local_path: str,
remote_path: str,
create_parents: bool = True,
progress: bool = False,
) -> dict:
"""Upload a file or directory from the local filesystem to Proton Drive.
Args:
local_path: Source path on local filesystem
remote_path: Destination path on Proton Drive
create_parents: Create parent dirs if they don't exist (default: True)
progress: Show rclone transfer progress output
Returns:
dict with status, remote path, and size
"""
expanded_local = os.path.expanduser(local_path)
if not os.path.exists(expanded_local):
return {"error": f"Local path '{expanded_local}' does not exist"}
if create_parents:
# Ensure the remote parent path exists by creating it
parent_remote = remote_path.rstrip("/")
# If remote_path looks like a file path, get its parent dir
if not remote_path.endswith("/"):
parent_dir = os.path.dirname(remote_path) or "/"
else:
parent_dir = remote_path
mkdir_rp = _remote_path(parent_dir)
_run_rclone(["mkdir", mkdir_rp], timeout=RCLONE_TIMEOUT_LIST)
rp = _remote_path(remote_path)
args = ["copy"]
if progress:
args.append("--progress")
args.extend([expanded_local, rp])
result = _run_rclone(args, timeout=RCLONE_TIMEOUT_IO)
if result.returncode != 0:
return {"error": result.stderr.strip()}
file_size = 0
if os.path.isfile(expanded_local):
file_size = os.path.getsize(expanded_local)
return {
"status": "uploaded",
"remote_path": remote_path,
"local_path": expanded_local,
"size_bytes": file_size,
}
# ---------------------------------------------------------------------------
# proton_drive_search
# ---------------------------------------------------------------------------
def proton_drive_search(
query: str,
path: str = "/",
regex: bool = False,
max_results: int = 50,
) -> dict:
"""Search for files by name across Proton Drive.
Uses `rclone lsf -R --files-only --json` piped through a name filter.
Args:
query: Search term (substring) or regex pattern
path: Root path to search under (default: "/")
regex: If True, treat query as regex instead of substring match
max_results: Maximum number of results to return
Returns:
dict with matching items array
"""
rp = _remote_path(path)
result = _run_rclone(
["lsf", "-R", "--files-only", "--json", rp],
timeout=RCLONE_TIMEOUT_LIST,
)
if result.returncode != 0:
return {"error": result.stderr.strip()}
all_items = _parse_lsf_json(result.stdout)
if regex:
try:
pattern = re.compile(query, re.IGNORECASE)
except re.error as e:
return {"error": f"Invalid regex: {e}"}
matches = [it for it in all_items if pattern.search(it.get("Name", ""))]
else:
q = query.lower()
matches = [it for it in all_items if q in it.get("Name", "").lower()]
limited = matches[:max_results]
return {
"items": limited,
"count": len(limited),
"total_matches": len(matches),
"query": query,
"path": path,
}
# ---------------------------------------------------------------------------
# proton_drive_mkdir
# ---------------------------------------------------------------------------
def proton_drive_mkdir(path: str) -> dict:
"""Create a folder on Proton Drive.
Creates all parent directories if they don't exist (like mkdir -p).
Args:
path: Path of folder to create (e.g. "Documents/NewFolder")
Returns:
dict with status
"""
rp = _remote_path(path)
result = _run_rclone(["mkdir", rp], timeout=RCLONE_TIMEOUT_LIST)
if result.returncode != 0:
return {"error": result.stderr.strip()}
return {"status": "created", "path": path}
# ---------------------------------------------------------------------------
# proton_drive_delete
# ---------------------------------------------------------------------------
def proton_drive_delete(path: str, recursive: bool = False) -> dict:
"""Delete a file or folder from Proton Drive.
Non-recursive deletion of non-empty folders will fail.
Args:
path: Path to the file or folder to delete
recursive: Recursively delete folder contents
Returns:
dict with status
"""
rp = _remote_path(path)
if recursive:
# rclone purge removes a directory and all its contents
result = _run_rclone(["purge", rp], timeout=RCLONE_TIMEOUT_IO)
else:
# rclone delete removes files only (fails on non-empty dirs)
result = _run_rclone(["delete", rp], timeout=RCLONE_TIMEOUT_IO)
if result.returncode != 0:
return {"error": result.stderr.strip()}
return {"status": "deleted", "path": path, "recursive": recursive}
# ---------------------------------------------------------------------------
# proton_drive_stat
# ---------------------------------------------------------------------------
def proton_drive_stat(path: str) -> dict:
"""Get detailed metadata for a file or folder.
Args:
path: Path to the file or folder on Proton Drive
Returns:
dict with metadata (Name, Path, Size, ModTime, IsDir, etc.)
"""
# Use lsl for size + modtime, then lsf --json for structured info
rp = _remote_path(path)
lsl_result = _run_rclone(["lsl", rp], timeout=RCLONE_TIMEOUT_LIST)
if lsl_result.returncode != 0:
# Try lsf instead (might be a directory)
lsf_result = _run_rclone(
["lsf", "--json", rp], timeout=RCLONE_TIMEOUT_LIST
)
if lsf_result.returncode != 0:
return {"error": lsf_result.stderr.strip()}
items = _parse_lsf_json(lsf_result.stdout)
if items:
return items[0]
return {"path": path, "IsDir": True}
parsed = _parse_lsl_output(lsl_result.stdout)
if parsed:
info = parsed[0]
info["path"] = path
info["IsDir"] = False
return info
return {"path": path, "error": "no metadata returned"}
# ---------------------------------------------------------------------------
# proton_drive_sync
# ---------------------------------------------------------------------------
def proton_drive_sync(
source: str,
dest: str,
dry_run: bool = True,
delete_excluded: bool = False,
) -> dict:
"""Synchronize a local directory with Proton Drive.
WARNING: Defaults to dry_run=True. Must confirm before live sync,
especially with delete_excluded=True.
Args:
source: Source path (local:/path or remote:path)
dest: Destination path (local:/path or remote:path)
dry_run: If True, show what would change without applying (default: True)
delete_excluded: Delete files at dest not present at source
Returns:
dict with sync report
"""
args = ["sync"]
if dry_run:
args.append("--dry-run")
if delete_excluded:
args.append("--delete-excluded")
# Auto-expand ~ in local paths
src = source if ":" in source else os.path.expanduser(source)
dst = dest if ":" in dest else os.path.expanduser(dest)
args.extend([src, dst])
result = _run_rclone(args, timeout=RCLONE_TIMEOUT_SYNC)
if result.returncode != 0:
return {"error": result.stderr.strip()}
# Parse output for summary information
summary = _parse_sync_output(result.stdout, result.stderr)
return {
"status": "dry_run" if dry_run else "synced",
"source": source,
"dest": dest,
"dry_run": dry_run,
"delete_excluded": delete_excluded,
**summary,
}
def _parse_sync_output(stdout: str, stderr: str) -> dict:
"""Extract transfer summary from rclone sync output."""
changes = []
errors = []
for line in stdout.split("\n"):
line = line.strip()
if line:
changes.append(line)
# rclone reports errors on stderr
for line in stderr.split("\n"):
line = line.strip()
if line and ("ERROR" in line.upper() or "failed" in line.lower()):
errors.append(line)
return {
"changes_log": changes,
"errors": errors or None,
"change_count": len(changes),
"error_count": len(errors),
}

277
tests/test_drive.py Normal file
View file

@ -0,0 +1,277 @@
"""Tests for the Proton Drive skill — all 9 rclone-backed tool handlers.
Uses unittest with monkeypatched subprocess.run to avoid needing rclone.
"""
import json
import os
import unittest
from unittest.mock import patch, MagicMock
from types import SimpleNamespace
# Load the tools module directly (hyphenated dir can't be imported as a package)
import importlib.machinery
import importlib.util
import sys
TOOLS_PATH = os.path.join(
os.path.dirname(__file__), "..", "skills", "proton-drive", "tools.py"
)
loader = importlib.machinery.SourceFileLoader("proton_drive_tools", os.path.abspath(TOOLS_PATH))
spec = importlib.util.spec_from_loader("proton_drive_tools", loader, origin=os.path.abspath(TOOLS_PATH))
DRIVE = importlib.util.module_from_spec(spec)
loader.exec_module(DRIVE)
# Register in sys.modules so unittest.mock.patch can resolve dotted paths
sys.modules["proton_drive_tools"] = DRIVE
# The mock target prefix matches the module name used during load
MOD = "proton_drive_tools"
def _fake_run(returncode=0, stdout="", stderr=""):
"""Create a mock subprocess.run replacement."""
def fake_run(*args, **kwargs):
return SimpleNamespace(
returncode=returncode,
stdout=stdout,
stderr=stderr,
args=args,
)
return fake_run
class TestProtonDriveList(unittest.TestCase):
@patch(f"{MOD}.subprocess.run")
def test_list_root(self, mock_run):
mock_run.return_value = SimpleNamespace(
returncode=0,
stdout=json.dumps({"Name": "Documents", "IsDir": True}) + "\n"
+ json.dumps({"Name": "notes.txt", "Size": 1024, "IsDir": False}),
stderr="",
)
result = DRIVE.proton_drive_list()
self.assertIn("items", result)
self.assertEqual(result["count"], 2)
self.assertEqual(result["items"][0]["Name"], "Documents")
self.assertEqual(result["items"][1]["Name"], "notes.txt")
@patch(f"{MOD}.subprocess.run")
def test_list_recursive(self, mock_run):
mock_run.return_value = SimpleNamespace(
returncode=0,
stdout=json.dumps({"Name": "sub/file.pdf", "Size": 2048, "IsDir": False}) + "\n",
stderr="",
)
result = DRIVE.proton_drive_list(path="Documents", recursive=True)
self.assertEqual(result["items"][0]["Name"], "sub/file.pdf")
def test_default_remote(self):
self.assertEqual(DRIVE._get_remote(), "protondrive")
def test_remote_path(self):
path = DRIVE._remote_path("Docs/File.txt")
self.assertEqual(path, "protondrive:Docs/File.txt")
class TestProtonDriveRead(unittest.TestCase):
@patch(f"{MOD}.subprocess.run")
def test_read_file(self, mock_run):
def side_effect(*args, **kwargs):
cmd = args[0]
if "lsl" in cmd:
return SimpleNamespace(returncode=0, stdout="5000 2026-01-01 12:00:00 notes.txt", stderr="")
elif "cat" in cmd:
return SimpleNamespace(returncode=0, stdout="Hello, Proton Drive!\nLine 2\nLine 3", stderr="")
return SimpleNamespace(returncode=0, stdout="", stderr="")
mock_run.side_effect = side_effect
result = DRIVE.proton_drive_read("notes.txt")
self.assertIn("content", result)
self.assertIn("Proton Drive", result["content"])
@patch(f"{MOD}.subprocess.run")
def test_read_large_file_redirects(self, mock_run):
mock_run.return_value = SimpleNamespace(
returncode=0,
stdout="15000000 2026-01-01 12:00:00 big_file.mp4",
stderr="",
)
result = DRIVE.proton_drive_read("big_file.mp4")
self.assertIn("error", result)
self.assertIn("proton_drive_download", result["error"])
@patch(f"{MOD}.subprocess.run")
def test_read_head(self, mock_run):
def side_effect(*args, **kwargs):
cmd = args[0]
if "lsl" in cmd:
return SimpleNamespace(returncode=0, stdout="200 2026-01-01 12:00:00 file.txt", stderr="")
elif "cat" in cmd:
return SimpleNamespace(returncode=0, stdout="a\nb\nc\nd\ne", stderr="")
return SimpleNamespace(returncode=0, stdout="", stderr="")
mock_run.side_effect = side_effect
result = DRIVE.proton_drive_read("file.txt", head=2)
lines = result["content"].split("\n")
self.assertEqual(len(lines), 2)
class TestProtonDriveDownload(unittest.TestCase):
@patch(f"{MOD}.subprocess.run")
@patch(f"{MOD}.os.path.isfile")
@patch(f"{MOD}.os.path.getsize")
def test_download(self, mock_size, mock_isfile, mock_run):
mock_run.return_value = SimpleNamespace(returncode=0, stdout="", stderr="")
mock_isfile.return_value = True
mock_size.return_value = 1024
result = DRIVE.proton_drive_download("remote/file.pdf", "/tmp/out.pdf")
self.assertEqual(result["status"], "downloaded")
self.assertEqual(result["size_bytes"], 1024)
class TestProtonDriveUpload(unittest.TestCase):
@patch(f"{MOD}.subprocess.run")
@patch(f"{MOD}.os.path.exists")
@patch(f"{MOD}.os.path.isfile")
@patch(f"{MOD}.os.path.getsize")
def test_upload(self, mock_size, mock_isfile, mock_exists, mock_run):
mock_exists.return_value = True
mock_isfile.return_value = True
mock_size.return_value = 8192
mock_run.return_value = SimpleNamespace(returncode=0, stdout="", stderr="")
result = DRIVE.proton_drive_upload("/tmp/mydoc.pdf", "Documents/mydoc.pdf")
self.assertEqual(result["status"], "uploaded")
def test_upload_missing_file(self):
result = DRIVE.proton_drive_upload("/nonexistent/file.txt", "remote/")
self.assertIn("error", result)
class TestProtonDriveSearch(unittest.TestCase):
@patch(f"{MOD}.subprocess.run")
def test_search(self, mock_run):
mock_run.return_value = SimpleNamespace(
returncode=0,
stdout=json.dumps({"Name": "report_q1.pdf", "Size": 5000}) + "\n"
+ json.dumps({"Name": "report_q2.pdf", "Size": 6000}) + "\n"
+ json.dumps({"Name": "notes.txt", "Size": 200}),
stderr="",
)
result = DRIVE.proton_drive_search("report")
self.assertEqual(result["count"], 2)
self.assertEqual(result["items"][0]["Name"], "report_q1.pdf")
class TestProtonDriveMkdir(unittest.TestCase):
@patch(f"{MOD}.subprocess.run")
def test_mkdir(self, mock_run):
mock_run.return_value = SimpleNamespace(returncode=0, stdout="", stderr="")
result = DRIVE.proton_drive_mkdir("Documents/NewFolder")
self.assertEqual(result["status"], "created")
class TestProtonDriveDelete(unittest.TestCase):
@patch(f"{MOD}.subprocess.run")
def test_delete_file(self, mock_run):
mock_run.return_value = SimpleNamespace(returncode=0, stdout="", stderr="")
result = DRIVE.proton_drive_delete("old_file.txt")
self.assertEqual(result["status"], "deleted")
@patch(f"{MOD}.subprocess.run")
def test_delete_recursive(self, mock_run):
mock_run.return_value = SimpleNamespace(returncode=0, stdout="", stderr="")
result = DRIVE.proton_drive_delete("OldFolder", recursive=True)
self.assertTrue(result["recursive"])
class TestProtonDriveStat(unittest.TestCase):
@patch(f"{MOD}.subprocess.run")
def test_stat_file(self, mock_run):
mock_run.return_value = SimpleNamespace(
returncode=0,
stdout="1500 2026-06-01 10:30:00 file.txt",
stderr="",
)
result = DRIVE.proton_drive_stat("file.txt")
self.assertEqual(result["Size"], "1500")
class TestProtonDriveSync(unittest.TestCase):
@patch(f"{MOD}.subprocess.run")
def test_sync_dry_run(self, mock_run):
mock_run.return_value = SimpleNamespace(
returncode=0,
stdout="file1.txt\nfile2.txt",
stderr="",
)
result = DRIVE.proton_drive_sync(
source="/tmp/mydir",
dest="protondrive:backup/",
dry_run=True,
)
self.assertEqual(result["status"], "dry_run")
self.assertTrue(result["dry_run"])
class TestRcloneHelpers(unittest.TestCase):
@patch(f"{MOD}.subprocess.run")
def test_check_available(self, mock_run):
mock_run.return_value = SimpleNamespace(returncode=0, stdout="rclone v1.69", stderr="")
result = DRIVE.check_rclone_availability()
self.assertIn("available", result)
@patch(f"{MOD}.subprocess.run")
def test_check_remote_missing(self, mock_run):
mock_run.return_value = SimpleNamespace(returncode=0, stdout="otherremote:\n", stderr="")
result = DRIVE.check_rclone_remote()
self.assertFalse(result["configured"])
@patch.dict(os.environ, {"PROTON_RCLONE_REMOTE": "customremote"}, clear=False)
def test_custom_remote(self):
self.assertEqual(DRIVE._get_remote(), "customremote")
@patch.dict(os.environ, {"PROTON_RCLONE_PATH": "/usr/local/bin/rclone"}, clear=False)
def test_custom_rclone_path(self):
self.assertEqual(DRIVE._get_rclone_path(), "/usr/local/bin/rclone")
def test_get_rclone_remote(self):
self.assertEqual(DRIVE.get_rclone_remote(), "protondrive")
class TestParseHelpers(unittest.TestCase):
def test_parse_lsf_json(self):
output = json.dumps({"Name": "a.txt", "Size": 10}) + "\n" + json.dumps({"Name": "b.txt", "Size": 20})
items = DRIVE._parse_lsf_json(output)
self.assertEqual(len(items), 2)
def test_parse_lsf_json_empty(self):
items = DRIVE._parse_lsf_json("")
self.assertEqual(items, [])
def test_parse_lsl_output(self):
output = "1024 2026-01-01 12:00:00 file.txt\n2048 2026-06-01 15:00:00 other.pdf"
items = DRIVE._parse_lsl_output(output)
self.assertEqual(len(items), 2)
self.assertEqual(items[0]["Size"], "1024")
def test_parse_sync_output(self):
result = DRIVE._parse_sync_output("file1.txt\nfile2.txt", "ERROR on file3.txt")
self.assertEqual(result["change_count"], 2)
self.assertEqual(result["error_count"], 1)
if __name__ == "__main__":
unittest.main()