app.py 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969
  1. import modal
  2. import json
  3. import logging
  4. from datetime import datetime, timezone
  5. from pathlib import Path
  6. from typing import List
  7. from fastapi import FastAPI, HTTPException, Header, Security, Request
  8. from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
  9. from fastapi.responses import JSONResponse, HTMLResponse
  10. from typing import Optional
  11. security = HTTPBearer()
  12. from models import (
  13. RecurringScheduleCreate,
  14. RecurringSchedule,
  15. OneTimeScheduleCreate,
  16. OneTimeSchedule,
  17. )
  18. from scheduler import is_recurring_schedule_due, is_onetime_schedule_due
  19. from letta_executor import execute_letta_message, validate_api_key
  20. from crypto_utils import get_api_key_hash, get_encryption_key, encrypt_json, decrypt_json
  21. from dateutil import parser as date_parser
  22. logging.basicConfig(level=logging.INFO)
  23. logger = logging.getLogger(__name__)
  24. app = modal.App("switchboard")
  25. import os as local_os
  26. # Read dev mode setting from local environment
  27. dev_mode_enabled = local_os.getenv("LETTA_SWITCHBOARD_DEV_MODE", "false")
  28. image = (
  29. modal.Image.debian_slim()
  30. .pip_install_from_requirements("requirements.txt")
  31. .env({"LETTA_SWITCHBOARD_DEV_MODE": dev_mode_enabled}) # Must come before add_local_*
  32. .add_local_python_source("models", "scheduler", "letta_executor", "crypto_utils")
  33. .add_local_file("dashboard.html", "/root/dashboard.html")
  34. )
  35. volume = modal.Volume.from_name("letta-switchboard-volume", create_if_missing=True)
  36. try:
  37. encryption_secret = modal.Secret.from_name("letta-switchboard-encryption")
  38. except Exception:
  39. logger.warning("letta-switchboard-encryption secret not found, will use env var or generate temporary key")
  40. encryption_secret = None
  41. VOLUME_PATH = "/data"
  42. SCHEDULES_BASE = f"{VOLUME_PATH}/schedules"
  43. RESULTS_BASE = f"{VOLUME_PATH}/results"
  44. web_app = FastAPI()
  45. # Lazy-load encryption key (will check env vars at runtime)
  46. _encryption_key = None
  47. def get_encryption_key_cached():
  48. global _encryption_key
  49. if _encryption_key is None:
  50. _encryption_key = get_encryption_key()
  51. return _encryption_key
  52. def get_recurring_schedule_path(api_key: str, schedule_id: str) -> str:
  53. """Get file path for recurring schedule."""
  54. api_key_hash = get_api_key_hash(api_key)
  55. return f"{SCHEDULES_BASE}/recurring/{api_key_hash}/{schedule_id}.json"
  56. def get_onetime_schedule_path(api_key: str, execute_at: str, schedule_id: str) -> str:
  57. """Get file path for one-time schedule with time bucketing."""
  58. api_key_hash = get_api_key_hash(api_key)
  59. dt = date_parser.parse(execute_at)
  60. date_str = dt.strftime("%Y-%m-%d")
  61. hour_str = dt.strftime("%H")
  62. return f"{SCHEDULES_BASE}/one-time/{date_str}/{hour_str}/{api_key_hash}/{schedule_id}.json"
  63. def save_schedule(file_path: str, schedule_data: dict):
  64. """Save encrypted schedule to file."""
  65. Path(file_path).parent.mkdir(parents=True, exist_ok=True)
  66. encrypted_data = encrypt_json(schedule_data, get_encryption_key_cached())
  67. with open(file_path, "wb") as f:
  68. f.write(encrypted_data)
  69. volume.commit()
  70. def load_schedule(file_path: str) -> dict:
  71. """Load and decrypt schedule from file."""
  72. try:
  73. with open(file_path, "rb") as f:
  74. encrypted_data = f.read()
  75. return decrypt_json(encrypted_data, get_encryption_key_cached())
  76. except FileNotFoundError:
  77. return None
  78. def delete_schedule(file_path: str):
  79. """Delete schedule file."""
  80. try:
  81. Path(file_path).unlink()
  82. volume.commit()
  83. return True
  84. except FileNotFoundError:
  85. return False
  86. def list_recurring_schedules_for_user(api_key: str) -> List[dict]:
  87. """List all recurring schedules for a specific user."""
  88. api_key_hash = get_api_key_hash(api_key)
  89. user_dir = f"{SCHEDULES_BASE}/recurring/{api_key_hash}"
  90. schedules = []
  91. if not Path(user_dir).exists():
  92. return schedules
  93. for file_path in Path(user_dir).glob("*.json"):
  94. try:
  95. with open(file_path, "rb") as f:
  96. encrypted_data = f.read()
  97. schedule = decrypt_json(encrypted_data, get_encryption_key_cached())
  98. schedules.append(schedule)
  99. except Exception as e:
  100. logger.error(f"Failed to load schedule {file_path}: {e}")
  101. return schedules
  102. def list_onetime_schedules_for_user(api_key: str) -> List[dict]:
  103. """List all one-time schedules for a specific user."""
  104. api_key_hash = get_api_key_hash(api_key)
  105. base_dir = f"{SCHEDULES_BASE}/one-time"
  106. schedules = []
  107. if not Path(base_dir).exists():
  108. return schedules
  109. # Traverse all date/hour buckets
  110. for date_dir in Path(base_dir).iterdir():
  111. if not date_dir.is_dir():
  112. continue
  113. for hour_dir in date_dir.iterdir():
  114. if not hour_dir.is_dir():
  115. continue
  116. user_dir = hour_dir / api_key_hash
  117. if not user_dir.exists():
  118. continue
  119. for file_path in user_dir.glob("*.json"):
  120. try:
  121. with open(file_path, "rb") as f:
  122. encrypted_data = f.read()
  123. schedule = decrypt_json(encrypted_data, get_encryption_key_cached())
  124. schedules.append(schedule)
  125. except Exception as e:
  126. logger.error(f"Failed to load schedule {file_path}: {e}")
  127. return schedules
  128. def list_all_recurring_schedules() -> List[dict]:
  129. """List all recurring schedules across all users (for cron job)."""
  130. schedules = []
  131. recurring_dir = f"{SCHEDULES_BASE}/recurring"
  132. if not Path(recurring_dir).exists():
  133. return schedules
  134. for user_dir in Path(recurring_dir).iterdir():
  135. if not user_dir.is_dir():
  136. continue
  137. for file_path in user_dir.glob("*.json"):
  138. try:
  139. with open(file_path, "rb") as f:
  140. encrypted_data = f.read()
  141. schedule = decrypt_json(encrypted_data, get_encryption_key_cached())
  142. schedules.append(schedule)
  143. except Exception as e:
  144. logger.error(f"Failed to load schedule {file_path}: {e}")
  145. return schedules
  146. def list_onetime_schedules_for_time(date_str: str, hour_str: str) -> List[dict]:
  147. """List all one-time schedules for a specific date/hour (for cron job)."""
  148. schedules = []
  149. time_dir = f"{SCHEDULES_BASE}/one-time/{date_str}/{hour_str}"
  150. if not Path(time_dir).exists():
  151. return schedules
  152. for user_dir in Path(time_dir).iterdir():
  153. if not user_dir.is_dir():
  154. continue
  155. for file_path in user_dir.glob("*.json"):
  156. try:
  157. with open(file_path, "rb") as f:
  158. encrypted_data = f.read()
  159. schedule = decrypt_json(encrypted_data, get_encryption_key_cached())
  160. schedules.append(schedule)
  161. except Exception as e:
  162. logger.error(f"Failed to load schedule {file_path}: {e}")
  163. return schedules
  164. def find_onetime_schedule_for_user(api_key: str, schedule_id: str) -> tuple[dict, str]:
  165. """Find a one-time schedule by ID for a specific user. Returns (schedule, file_path)."""
  166. api_key_hash = get_api_key_hash(api_key)
  167. base_dir = f"{SCHEDULES_BASE}/one-time"
  168. if not Path(base_dir).exists():
  169. return None, None
  170. # Search through all time buckets for this user
  171. for date_dir in Path(base_dir).iterdir():
  172. if not date_dir.is_dir():
  173. continue
  174. for hour_dir in date_dir.iterdir():
  175. if not hour_dir.is_dir():
  176. continue
  177. user_dir = hour_dir / api_key_hash
  178. if not user_dir.exists():
  179. continue
  180. file_path = user_dir / f"{schedule_id}.json"
  181. if file_path.exists():
  182. try:
  183. with open(file_path, "rb") as f:
  184. encrypted_data = f.read()
  185. schedule = decrypt_json(encrypted_data, get_encryption_key_cached())
  186. return schedule, str(file_path)
  187. except Exception as e:
  188. logger.error(f"Failed to load schedule {file_path}: {e}")
  189. return None, None
  190. @web_app.get("/")
  191. async def root(request: Request):
  192. """Landing page with usage instructions."""
  193. # Check if browser is requesting HTML
  194. accept_header = request.headers.get("accept", "")
  195. wants_html = "text/html" in accept_header
  196. info = {
  197. "service": "Letta Switchboard",
  198. "description": "Free hosted message routing service for Letta agents",
  199. "version": "1.0.0",
  200. "documentation": "https://github.com/cpfiffer/letta-switchboard",
  201. "features": [
  202. "Send messages immediately or scheduled for later",
  203. "Recurring schedules with cron expressions",
  204. "Secure API key isolation",
  205. "Execution tracking with run IDs"
  206. ],
  207. "quick_start": {
  208. "curl_example": {
  209. "one_time": "curl -X POST https://letta--switchboard-api.modal.run/schedules/one-time -H 'Authorization: Bearer YOUR_LETTA_API_KEY' -H 'Content-Type: application/json' -d '{\"agent_id\": \"agent-xxx\", \"execute_at\": \"2025-11-13T09:00:00Z\", \"message\": \"Hello!\"}'",
  210. "recurring": "curl -X POST https://letta--switchboard-api.modal.run/schedules/recurring -H 'Authorization: Bearer YOUR_LETTA_API_KEY' -H 'Content-Type: application/json' -d '{\"agent_id\": \"agent-xxx\", \"cron\": \"0 9 * * 1-5\", \"message\": \"Daily standup\"}'"
  211. },
  212. "cli": {
  213. "install": "git clone https://github.com/cpfiffer/letta-switchboard.git && cd letta-switchboard/cli && go build -o letta-switchboard",
  214. "configure": "./letta-switchboard config set-api-key YOUR_LETTA_API_KEY",
  215. "send": "./letta-switchboard send --agent-id agent-xxx --message 'Hello!'",
  216. "schedule": "./letta-switchboard send --agent-id agent-xxx --message 'Reminder' --execute-at 'tomorrow at 9am'",
  217. "recurring": "./letta-switchboard recurring create --agent-id agent-xxx --message 'Daily standup' --cron 'every weekday'"
  218. }
  219. },
  220. "endpoints": {
  221. "POST /schedules/one-time": "Create a one-time schedule",
  222. "POST /schedules/recurring": "Create a recurring schedule",
  223. "GET /schedules/one-time": "List your one-time schedules",
  224. "GET /schedules/recurring": "List your recurring schedules",
  225. "GET /schedules/one-time/{id}": "Get specific one-time schedule",
  226. "GET /schedules/recurring/{id}": "Get specific recurring schedule",
  227. "DELETE /schedules/one-time/{id}": "Delete one-time schedule",
  228. "DELETE /schedules/recurring/{id}": "Delete recurring schedule",
  229. "GET /results": "List execution results",
  230. "GET /results/{schedule_id}": "Get result for specific schedule"
  231. },
  232. "authentication": "All endpoints require 'Authorization: Bearer YOUR_LETTA_API_KEY' header",
  233. "support": "https://github.com/cpfiffer/letta-switchboard/issues"
  234. }
  235. if wants_html:
  236. html_content = """
  237. <!DOCTYPE html>
  238. <html>
  239. <head>
  240. <title>Letta Switchboard</title>
  241. <meta charset="UTF-8">
  242. <meta name="viewport" content="width=device-width, initial-scale=1.0">
  243. <style>
  244. * { box-sizing: border-box; }
  245. body {
  246. font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
  247. background: #f8f9fa;
  248. margin: 0;
  249. padding: 24px;
  250. line-height: 1.6;
  251. color: #1a1a1a;
  252. }
  253. .container {
  254. max-width: 720px;
  255. margin: 0 auto;
  256. }
  257. h1 {
  258. font-size: 28px;
  259. font-weight: 600;
  260. margin: 0 0 8px 0;
  261. }
  262. .subtitle {
  263. color: #666;
  264. margin-bottom: 24px;
  265. }
  266. .status {
  267. display: inline-flex;
  268. align-items: center;
  269. gap: 6px;
  270. background: #e8f5e9;
  271. color: #2e7d32;
  272. padding: 4px 10px;
  273. border-radius: 4px;
  274. font-size: 13px;
  275. font-weight: 500;
  276. margin-bottom: 24px;
  277. }
  278. .status::before {
  279. content: '';
  280. width: 8px;
  281. height: 8px;
  282. background: #4caf50;
  283. border-radius: 50%;
  284. }
  285. .dashboard-btn {
  286. display: inline-block;
  287. background: #1a1a1a;
  288. color: white;
  289. padding: 12px 24px;
  290. text-decoration: none;
  291. border-radius: 6px;
  292. font-weight: 500;
  293. margin-bottom: 32px;
  294. }
  295. .dashboard-btn:hover {
  296. background: #333;
  297. }
  298. h2 {
  299. font-size: 18px;
  300. font-weight: 600;
  301. margin: 32px 0 12px 0;
  302. padding-bottom: 8px;
  303. border-bottom: 1px solid #e0e0e0;
  304. }
  305. .features {
  306. display: grid;
  307. grid-template-columns: repeat(auto-fit, minmax(280px, 1fr));
  308. gap: 12px;
  309. margin-bottom: 8px;
  310. }
  311. .feature {
  312. background: white;
  313. padding: 12px 16px;
  314. border-radius: 6px;
  315. border: 1px solid #e0e0e0;
  316. }
  317. .feature-title {
  318. font-weight: 600;
  319. margin-bottom: 2px;
  320. }
  321. .feature-desc {
  322. color: #666;
  323. font-size: 14px;
  324. }
  325. pre {
  326. background: #1a1a1a;
  327. color: #e0e0e0;
  328. padding: 16px;
  329. border-radius: 6px;
  330. overflow-x: auto;
  331. font-family: 'SF Mono', Consolas, monospace;
  332. font-size: 13px;
  333. line-height: 1.5;
  334. margin: 12px 0;
  335. }
  336. pre .comment { color: #6a9955; }
  337. pre .string { color: #ce9178; }
  338. code {
  339. background: #f0f0f0;
  340. padding: 2px 6px;
  341. border-radius: 3px;
  342. font-family: 'SF Mono', Consolas, monospace;
  343. font-size: 13px;
  344. }
  345. .endpoints {
  346. background: white;
  347. border: 1px solid #e0e0e0;
  348. border-radius: 6px;
  349. overflow: hidden;
  350. }
  351. .endpoint {
  352. display: flex;
  353. padding: 10px 16px;
  354. border-bottom: 1px solid #e0e0e0;
  355. font-size: 14px;
  356. }
  357. .endpoint:last-child { border-bottom: none; }
  358. .endpoint-method {
  359. font-family: 'SF Mono', Consolas, monospace;
  360. font-weight: 600;
  361. width: 220px;
  362. flex-shrink: 0;
  363. }
  364. .endpoint-desc {
  365. color: #666;
  366. }
  367. .note {
  368. background: #fff3e0;
  369. border-left: 3px solid #ff9800;
  370. padding: 12px 16px;
  371. margin: 16px 0;
  372. border-radius: 0 6px 6px 0;
  373. }
  374. .links {
  375. display: flex;
  376. gap: 24px;
  377. flex-wrap: wrap;
  378. }
  379. .links a {
  380. color: #1a1a1a;
  381. text-decoration: none;
  382. font-weight: 500;
  383. }
  384. .links a:hover {
  385. text-decoration: underline;
  386. }
  387. @media (max-width: 600px) {
  388. .endpoint { flex-direction: column; gap: 4px; }
  389. .endpoint-method { width: auto; }
  390. }
  391. </style>
  392. </head>
  393. <body>
  394. <div class="container">
  395. <h1>Letta Switchboard</h1>
  396. <p class="subtitle">Message scheduling and routing for Letta agents</p>
  397. <div class="status">Operational</div>
  398. <br>
  399. <a href="/dashboard" class="dashboard-btn">Open Dashboard</a>
  400. <h2>Features</h2>
  401. <div class="features">
  402. <div class="feature">
  403. <div class="feature-title">Scheduled Messages</div>
  404. <div class="feature-desc">Send messages now or schedule for later</div>
  405. </div>
  406. <div class="feature">
  407. <div class="feature-title">Recurring Schedules</div>
  408. <div class="feature-desc">Cron expressions for repeated delivery</div>
  409. </div>
  410. <div class="feature">
  411. <div class="feature-title">API Key Isolation</div>
  412. <div class="feature-desc">Secure per-user schedule storage</div>
  413. </div>
  414. <div class="feature">
  415. <div class="feature-title">Execution Tracking</div>
  416. <div class="feature-desc">Track results with run IDs</div>
  417. </div>
  418. </div>
  419. <h2>Quick Start</h2>
  420. <p>Schedule a one-time message:</p>
  421. <pre>curl -X POST https://letta--switchboard-api.modal.run/schedules/one-time \\
  422. -H 'Authorization: Bearer YOUR_LETTA_API_KEY' \\
  423. -H 'Content-Type: application/json' \\
  424. -d '{
  425. <span class="string">"agent_id"</span>: <span class="string">"agent-xxx"</span>,
  426. <span class="string">"execute_at"</span>: <span class="string">"2025-12-09T09:00:00Z"</span>,
  427. <span class="string">"message"</span>: <span class="string">"Hello from Switchboard!"</span>
  428. }'</pre>
  429. <p>Create a recurring schedule:</p>
  430. <pre>curl -X POST https://letta--switchboard-api.modal.run/schedules/recurring \\
  431. -H 'Authorization: Bearer YOUR_LETTA_API_KEY' \\
  432. -H 'Content-Type: application/json' \\
  433. -d '{
  434. <span class="string">"agent_id"</span>: <span class="string">"agent-xxx"</span>,
  435. <span class="string">"cron"</span>: <span class="string">"0 9 * * 1-5"</span>,
  436. <span class="string">"message"</span>: <span class="string">"Daily standup reminder"</span>
  437. }'</pre>
  438. <h2>CLI Tool</h2>
  439. <p>Natural language scheduling from your terminal:</p>
  440. <pre><span class="comment"># Install</span>
  441. git clone https://github.com/cpfiffer/letta-switchboard.git
  442. cd letta-switchboard/cli && go build -o letta-schedules
  443. ./letta-schedules config set-api-key YOUR_LETTA_API_KEY
  444. <span class="comment"># Send a message</span>
  445. ./letta-schedules send --agent-id agent-xxx --message "Hello!"
  446. <span class="comment"># Schedule with natural language</span>
  447. ./letta-schedules send --agent-id agent-xxx --message "Reminder" --at "tomorrow 9am"
  448. <span class="comment"># Create recurring schedule</span>
  449. ./letta-schedules recurring create --agent-id agent-xxx --cron "every weekday" --message "Standup"</pre>
  450. <h2>API Endpoints</h2>
  451. <div class="endpoints">
  452. <div class="endpoint">
  453. <span class="endpoint-method">POST /schedules/one-time</span>
  454. <span class="endpoint-desc">Create one-time schedule</span>
  455. </div>
  456. <div class="endpoint">
  457. <span class="endpoint-method">POST /schedules/recurring</span>
  458. <span class="endpoint-desc">Create recurring schedule</span>
  459. </div>
  460. <div class="endpoint">
  461. <span class="endpoint-method">GET /schedules/one-time</span>
  462. <span class="endpoint-desc">List one-time schedules</span>
  463. </div>
  464. <div class="endpoint">
  465. <span class="endpoint-method">GET /schedules/recurring</span>
  466. <span class="endpoint-desc">List recurring schedules</span>
  467. </div>
  468. <div class="endpoint">
  469. <span class="endpoint-method">DELETE /schedules/{type}/{id}</span>
  470. <span class="endpoint-desc">Delete a schedule</span>
  471. </div>
  472. <div class="endpoint">
  473. <span class="endpoint-method">GET /results</span>
  474. <span class="endpoint-desc">List execution results</span>
  475. </div>
  476. </div>
  477. <div class="note">
  478. <strong>Authentication required:</strong> All endpoints need <code>Authorization: Bearer YOUR_LETTA_API_KEY</code>
  479. </div>
  480. <h2>Links</h2>
  481. <div class="links">
  482. <a href="/dashboard">Dashboard</a>
  483. <a href="https://github.com/cpfiffer/letta-switchboard">Documentation</a>
  484. <a href="https://github.com/cpfiffer/letta-switchboard/issues">Support</a>
  485. </div>
  486. </div>
  487. </body>
  488. </html>
  489. """
  490. return HTMLResponse(content=html_content)
  491. return info
  492. @web_app.get("/dashboard")
  493. async def dashboard():
  494. """Dashboard UI for managing schedules."""
  495. try:
  496. with open("/root/dashboard.html", "r") as f:
  497. html_content = f.read()
  498. return HTMLResponse(content=html_content)
  499. except FileNotFoundError:
  500. raise HTTPException(status_code=404, detail="Dashboard not found")
  501. @web_app.post("/schedules/recurring")
  502. async def create_recurring_schedule(schedule: RecurringScheduleCreate, credentials: HTTPAuthorizationCredentials = Security(security)):
  503. api_key = credentials.credentials
  504. if not validate_api_key(api_key):
  505. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  506. # Inject api_key from Authorization header
  507. schedule_obj = RecurringSchedule(api_key=api_key, **schedule.model_dump())
  508. schedule_dict = schedule_obj.model_dump(mode='json')
  509. file_path = get_recurring_schedule_path(api_key, schedule_obj.id)
  510. save_schedule(file_path, schedule_dict)
  511. response_dict = schedule_dict.copy()
  512. response_dict.pop("api_key", None)
  513. return JSONResponse(content=response_dict, status_code=201)
  514. @web_app.post("/schedules/one-time")
  515. async def create_onetime_schedule(schedule: OneTimeScheduleCreate, credentials: HTTPAuthorizationCredentials = Security(security)):
  516. api_key = credentials.credentials
  517. if not validate_api_key(api_key):
  518. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  519. # Inject api_key from Authorization header
  520. schedule_obj = OneTimeSchedule(api_key=api_key, **schedule.model_dump())
  521. schedule_dict = schedule_obj.model_dump(mode='json')
  522. file_path = get_onetime_schedule_path(api_key, schedule.execute_at, schedule_obj.id)
  523. save_schedule(file_path, schedule_dict)
  524. response_dict = schedule_dict.copy()
  525. response_dict.pop("api_key", None)
  526. return JSONResponse(content=response_dict, status_code=201)
  527. @web_app.get("/schedules/recurring")
  528. async def list_recurring_schedules(credentials: HTTPAuthorizationCredentials = Security(security)):
  529. api_key = credentials.credentials
  530. if not validate_api_key(api_key):
  531. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  532. schedules = list_recurring_schedules_for_user(api_key)
  533. for schedule in schedules:
  534. schedule.pop("api_key", None)
  535. return JSONResponse(content=schedules)
  536. @web_app.get("/schedules/one-time")
  537. async def list_onetime_schedules(credentials: HTTPAuthorizationCredentials = Security(security)):
  538. api_key = credentials.credentials
  539. if not validate_api_key(api_key):
  540. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  541. schedules = list_onetime_schedules_for_user(api_key)
  542. for schedule in schedules:
  543. schedule.pop("api_key", None)
  544. return JSONResponse(content=schedules)
  545. @web_app.get("/schedules/recurring/{schedule_id}")
  546. async def get_recurring_schedule(schedule_id: str, credentials: HTTPAuthorizationCredentials = Security(security)):
  547. api_key = credentials.credentials
  548. if not validate_api_key(api_key):
  549. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  550. file_path = get_recurring_schedule_path(api_key, schedule_id)
  551. schedule = load_schedule(file_path)
  552. if schedule is None:
  553. raise HTTPException(status_code=404, detail="Schedule not found")
  554. if schedule.get("api_key") != api_key:
  555. raise HTTPException(status_code=403, detail="Forbidden")
  556. schedule_copy = schedule.copy()
  557. schedule_copy.pop("api_key", None)
  558. return JSONResponse(content=schedule_copy)
  559. @web_app.get("/schedules/one-time/{schedule_id}")
  560. async def get_onetime_schedule(schedule_id: str, credentials: HTTPAuthorizationCredentials = Security(security)):
  561. api_key = credentials.credentials
  562. if not validate_api_key(api_key):
  563. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  564. schedule, _ = find_onetime_schedule_for_user(api_key, schedule_id)
  565. if schedule is None:
  566. raise HTTPException(status_code=404, detail="Schedule not found")
  567. if schedule.get("api_key") != api_key:
  568. raise HTTPException(status_code=403, detail="Forbidden")
  569. schedule_copy = schedule.copy()
  570. schedule_copy.pop("api_key", None)
  571. return JSONResponse(content=schedule_copy)
  572. @web_app.delete("/schedules/recurring/{schedule_id}")
  573. async def delete_recurring_schedule(schedule_id: str, credentials: HTTPAuthorizationCredentials = Security(security)):
  574. api_key = credentials.credentials
  575. if not validate_api_key(api_key):
  576. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  577. file_path = get_recurring_schedule_path(api_key, schedule_id)
  578. schedule = load_schedule(file_path)
  579. if schedule is None:
  580. raise HTTPException(status_code=404, detail="Schedule not found")
  581. if schedule.get("api_key") != api_key:
  582. raise HTTPException(status_code=403, detail="Forbidden")
  583. if delete_schedule(file_path):
  584. return JSONResponse(content={"message": "Schedule deleted"})
  585. else:
  586. raise HTTPException(status_code=404, detail="Schedule not found")
  587. @web_app.delete("/schedules/one-time/{schedule_id}")
  588. async def delete_onetime_schedule(schedule_id: str, credentials: HTTPAuthorizationCredentials = Security(security)):
  589. api_key = credentials.credentials
  590. if not validate_api_key(api_key):
  591. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  592. schedule, file_path = find_onetime_schedule_for_user(api_key, schedule_id)
  593. if schedule is None:
  594. raise HTTPException(status_code=404, detail="Schedule not found")
  595. if schedule.get("api_key") != api_key:
  596. raise HTTPException(status_code=403, detail="Forbidden")
  597. if delete_schedule(file_path):
  598. return JSONResponse(content={"message": "Schedule deleted"})
  599. else:
  600. raise HTTPException(status_code=404, detail="Schedule not found")
  601. @web_app.get("/results")
  602. async def list_execution_results(credentials: HTTPAuthorizationCredentials = Security(security)):
  603. """List all execution results for the authenticated user."""
  604. api_key = credentials.credentials
  605. if not validate_api_key(api_key):
  606. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  607. api_key_hash = get_api_key_hash(api_key)
  608. results_dir = f"{RESULTS_BASE}/{api_key_hash}"
  609. results = []
  610. if Path(results_dir).exists():
  611. for result_file in Path(results_dir).glob("*.json"):
  612. try:
  613. with open(result_file, "rb") as f:
  614. encrypted_data = f.read()
  615. result = decrypt_json(encrypted_data, get_encryption_key_cached())
  616. results.append(result)
  617. except Exception as e:
  618. logger.error(f"Failed to load result {result_file}: {e}")
  619. return JSONResponse(content=results)
  620. @web_app.get("/results/{schedule_id}")
  621. async def get_execution_result(schedule_id: str, credentials: HTTPAuthorizationCredentials = Security(security)):
  622. """Get execution result for a specific schedule."""
  623. api_key = credentials.credentials
  624. if not validate_api_key(api_key):
  625. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  626. api_key_hash = get_api_key_hash(api_key)
  627. result_file = f"{RESULTS_BASE}/{api_key_hash}/{schedule_id}.json"
  628. if not Path(result_file).exists():
  629. raise HTTPException(status_code=404, detail="Result not found")
  630. try:
  631. with open(result_file, "rb") as f:
  632. encrypted_data = f.read()
  633. result = decrypt_json(encrypted_data, get_encryption_key_cached())
  634. return JSONResponse(content=result)
  635. except Exception as e:
  636. logger.error(f"Failed to load result: {e}")
  637. raise HTTPException(status_code=500, detail="Failed to load result")
  638. @app.function(image=image, volumes={VOLUME_PATH: volume}, secrets=[encryption_secret] if encryption_secret else [])
  639. @modal.asgi_app()
  640. def api():
  641. return web_app
  642. @app.function(image=image, volumes={VOLUME_PATH: volume}, secrets=[encryption_secret] if encryption_secret else [])
  643. async def execute_schedule(
  644. schedule_id: str,
  645. agent_id: str,
  646. api_key: str,
  647. message: str,
  648. role: str,
  649. schedule_type: str,
  650. execute_at: str = None,
  651. ):
  652. logger.info(f"Executing {schedule_type} schedule {schedule_id} for agent {agent_id}")
  653. # Reload volume to see latest changes (in case schedule was deleted)
  654. volume.reload()
  655. # Check if schedule still exists before executing
  656. if schedule_type == "one-time" and execute_at:
  657. file_path = get_onetime_schedule_path(api_key, execute_at, schedule_id)
  658. else:
  659. file_path = get_recurring_schedule_path(api_key, schedule_id)
  660. schedule = load_schedule(file_path)
  661. if not schedule:
  662. # For one-time schedules, this is expected (already executed or deleted)
  663. # For recurring schedules, this indicates the schedule was deleted by the user
  664. if schedule_type == "one-time":
  665. logger.debug(f"One-time schedule {schedule_id} not found (already executed or deleted)")
  666. else:
  667. logger.info(f"Recurring schedule {schedule_id} was deleted, skipping execution")
  668. return {"success": False, "error": "Schedule deleted"}
  669. # For one-time schedules: DELETE IMMEDIATELY to prevent race condition
  670. # Filesystem becomes source of truth: if file exists, it hasn't run
  671. if schedule_type == "one-time":
  672. try:
  673. Path(file_path).unlink()
  674. volume.commit()
  675. logger.info(f"Deleted one-time schedule {schedule_id} to prevent re-execution")
  676. except Exception as e:
  677. logger.error(f"Failed to delete schedule {schedule_id}, may re-execute: {e}")
  678. return {"success": False, "error": "Could not lock schedule for execution"}
  679. # For recurring schedules: update last_run timestamp
  680. elif schedule_type == "recurring":
  681. schedule["last_run"] = datetime.utcnow().isoformat()
  682. save_schedule(file_path, schedule)
  683. logger.info(f"Updated last_run for recurring schedule {schedule_id}")
  684. # Execute the message
  685. result = await execute_letta_message(agent_id, api_key, message, role)
  686. # Always save execution result (success or failure)
  687. if result.get("success"):
  688. # Successful execution
  689. save_execution_result(
  690. api_key=api_key,
  691. schedule_id=schedule_id,
  692. schedule_type=schedule_type,
  693. agent_id=agent_id,
  694. message=message,
  695. run_id=result.get("run_id"),
  696. status="success"
  697. )
  698. else:
  699. # Failed execution - save error result
  700. error_msg = result.get("error", "Unknown error")
  701. logger.error(f"Execution failed for schedule {schedule_id}: {error_msg}")
  702. save_execution_result(
  703. api_key=api_key,
  704. schedule_id=schedule_id,
  705. schedule_type=schedule_type,
  706. agent_id=agent_id,
  707. message=message,
  708. error=error_msg,
  709. status="failed"
  710. )
  711. # Only terminate recurring schedules on permanent errors (401, 404)
  712. # Transient errors (timeouts, rate limits, 5xx) should not remove the schedule
  713. if schedule_type == "recurring" and result.get("permanent", False):
  714. try:
  715. Path(file_path).unlink()
  716. volume.commit()
  717. logger.warning(f"Terminated recurring schedule {schedule_id} due to permanent error: {error_msg}")
  718. except Exception as e:
  719. logger.error(f"Failed to delete failed recurring schedule {schedule_id}: {e}")
  720. return result
  721. def save_execution_result(api_key: str, schedule_id: str, schedule_type: str, agent_id: str, message: str, run_id: str = None, error: str = None, status: str = "success"):
  722. """Save execution result to results folder (success or failure)."""
  723. api_key_hash = get_api_key_hash(api_key)
  724. result_dir = f"{RESULTS_BASE}/{api_key_hash}"
  725. Path(result_dir).mkdir(parents=True, exist_ok=True)
  726. result_file = f"{result_dir}/{schedule_id}.json"
  727. result_data = {
  728. "schedule_id": schedule_id,
  729. "schedule_type": schedule_type,
  730. "status": status,
  731. "agent_id": agent_id,
  732. "message": message,
  733. "executed_at": datetime.utcnow().isoformat(),
  734. }
  735. # Add run_id only if present (successful execution)
  736. if run_id:
  737. result_data["run_id"] = run_id
  738. # Add error only if present (failed execution)
  739. if error:
  740. result_data["error"] = error
  741. encrypted_data = encrypt_json(result_data, get_encryption_key_cached())
  742. with open(result_file, "wb") as f:
  743. f.write(encrypted_data)
  744. volume.commit()
  745. logger.info(f"Saved execution result for schedule {schedule_id}, run_id: {run_id}")
  746. def cleanup_empty_directories():
  747. """Remove empty directories to keep filesystem clean."""
  748. removed_count = 0
  749. # Clean up one-time schedule directories (date/hour/user structure)
  750. onetime_base = f"{SCHEDULES_BASE}/one-time"
  751. if Path(onetime_base).exists():
  752. for date_dir in Path(onetime_base).iterdir():
  753. if not date_dir.is_dir():
  754. continue
  755. for hour_dir in date_dir.iterdir():
  756. if not hour_dir.is_dir():
  757. continue
  758. # Remove empty user directories
  759. for user_dir in hour_dir.iterdir():
  760. if user_dir.is_dir() and not any(user_dir.iterdir()):
  761. user_dir.rmdir()
  762. removed_count += 1
  763. logger.debug(f"Removed empty directory: {user_dir}")
  764. # Remove empty hour directory
  765. if not any(hour_dir.iterdir()):
  766. hour_dir.rmdir()
  767. removed_count += 1
  768. logger.debug(f"Removed empty directory: {hour_dir}")
  769. # Remove empty date directory
  770. if not any(date_dir.iterdir()):
  771. date_dir.rmdir()
  772. removed_count += 1
  773. logger.debug(f"Removed empty directory: {date_dir}")
  774. # Clean up recurring schedule directories (user structure only)
  775. recurring_base = f"{SCHEDULES_BASE}/recurring"
  776. if Path(recurring_base).exists():
  777. for user_dir in Path(recurring_base).iterdir():
  778. if user_dir.is_dir() and not any(user_dir.iterdir()):
  779. user_dir.rmdir()
  780. removed_count += 1
  781. logger.debug(f"Removed empty directory: {user_dir}")
  782. if removed_count > 0:
  783. logger.info(f"Cleanup: Removed {removed_count} empty directories")
  784. volume.commit()
  785. @app.function(
  786. image=image,
  787. volumes={VOLUME_PATH: volume},
  788. secrets=[encryption_secret] if encryption_secret else [],
  789. schedule=modal.Cron("* * * * *"),
  790. )
  791. async def check_and_execute_schedules():
  792. logger.info("Checking schedules...")
  793. # Reload volume to see latest changes (deletes, updates, etc.)
  794. volume.reload()
  795. current_time = datetime.now(timezone.utc)
  796. # Check recurring schedules (all users)
  797. recurring_schedules = list_all_recurring_schedules()
  798. for schedule in recurring_schedules:
  799. if is_recurring_schedule_due(schedule, current_time):
  800. logger.info(f"Executing recurring schedule {schedule['id']}")
  801. execute_schedule.spawn(
  802. schedule_id=schedule["id"],
  803. agent_id=schedule["agent_id"],
  804. api_key=schedule["api_key"],
  805. message=schedule["message"],
  806. role=schedule["role"],
  807. schedule_type="recurring",
  808. )
  809. # Check one-time schedules (only current date/hour bucket)
  810. date_str = current_time.strftime("%Y-%m-%d")
  811. hour_str = current_time.strftime("%H")
  812. onetime_schedules = list_onetime_schedules_for_time(date_str, hour_str)
  813. for schedule in onetime_schedules:
  814. if is_onetime_schedule_due(schedule, current_time):
  815. logger.info(f"Executing one-time schedule {schedule['id']}")
  816. execute_schedule.spawn(
  817. schedule_id=schedule["id"],
  818. agent_id=schedule["agent_id"],
  819. api_key=schedule["api_key"],
  820. message=schedule["message"],
  821. role=schedule["role"],
  822. schedule_type="one-time",
  823. execute_at=schedule["execute_at"],
  824. )
  825. # Clean up empty directories
  826. cleanup_empty_directories()
  827. logger.info("Schedule check complete")