app.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906
  1. import modal
  2. import json
  3. import logging
  4. from datetime import datetime, timezone
  5. from pathlib import Path
  6. from typing import List
  7. from fastapi import FastAPI, HTTPException, Header, Security, Request
  8. from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
  9. from fastapi.responses import JSONResponse, HTMLResponse
  10. from typing import Optional
  11. security = HTTPBearer()
  12. from models import (
  13. RecurringScheduleCreate,
  14. RecurringSchedule,
  15. OneTimeScheduleCreate,
  16. OneTimeSchedule,
  17. )
  18. from scheduler import is_recurring_schedule_due, is_onetime_schedule_due
  19. from letta_executor import execute_letta_message, validate_api_key
  20. from crypto_utils import get_api_key_hash, get_encryption_key, encrypt_json, decrypt_json
  21. from dateutil import parser as date_parser
  22. logging.basicConfig(level=logging.INFO)
  23. logger = logging.getLogger(__name__)
  24. app = modal.App("switchboard")
  25. import os as local_os
  26. # Read dev mode setting from local environment
  27. dev_mode_enabled = local_os.getenv("LETTA_SWITCHBOARD_DEV_MODE", "false")
  28. image = (
  29. modal.Image.debian_slim()
  30. .pip_install_from_requirements("requirements.txt")
  31. .env({"LETTA_SWITCHBOARD_DEV_MODE": dev_mode_enabled}) # Must come before add_local_*
  32. .add_local_python_source("models", "scheduler", "letta_executor", "crypto_utils")
  33. .add_local_file("dashboard.html", "/root/dashboard.html")
  34. )
  35. volume = modal.Volume.from_name("letta-switchboard-volume", create_if_missing=True)
  36. try:
  37. encryption_secret = modal.Secret.from_name("letta-switchboard-encryption")
  38. except Exception:
  39. logger.warning("letta-switchboard-encryption secret not found, will use env var or generate temporary key")
  40. encryption_secret = None
  41. VOLUME_PATH = "/data"
  42. SCHEDULES_BASE = f"{VOLUME_PATH}/schedules"
  43. RESULTS_BASE = f"{VOLUME_PATH}/results"
  44. web_app = FastAPI()
  45. # Lazy-load encryption key (will check env vars at runtime)
  46. _encryption_key = None
  47. def get_encryption_key_cached():
  48. global _encryption_key
  49. if _encryption_key is None:
  50. _encryption_key = get_encryption_key()
  51. return _encryption_key
  52. def get_recurring_schedule_path(api_key: str, schedule_id: str) -> str:
  53. """Get file path for recurring schedule."""
  54. api_key_hash = get_api_key_hash(api_key)
  55. return f"{SCHEDULES_BASE}/recurring/{api_key_hash}/{schedule_id}.json"
  56. def get_onetime_schedule_path(api_key: str, execute_at: str, schedule_id: str) -> str:
  57. """Get file path for one-time schedule with time bucketing."""
  58. api_key_hash = get_api_key_hash(api_key)
  59. dt = date_parser.parse(execute_at)
  60. date_str = dt.strftime("%Y-%m-%d")
  61. hour_str = dt.strftime("%H")
  62. return f"{SCHEDULES_BASE}/one-time/{date_str}/{hour_str}/{api_key_hash}/{schedule_id}.json"
  63. def save_schedule(file_path: str, schedule_data: dict):
  64. """Save encrypted schedule to file."""
  65. Path(file_path).parent.mkdir(parents=True, exist_ok=True)
  66. encrypted_data = encrypt_json(schedule_data, get_encryption_key_cached())
  67. with open(file_path, "wb") as f:
  68. f.write(encrypted_data)
  69. volume.commit()
  70. def load_schedule(file_path: str) -> dict:
  71. """Load and decrypt schedule from file."""
  72. try:
  73. with open(file_path, "rb") as f:
  74. encrypted_data = f.read()
  75. return decrypt_json(encrypted_data, get_encryption_key_cached())
  76. except FileNotFoundError:
  77. return None
  78. def delete_schedule(file_path: str):
  79. """Delete schedule file."""
  80. try:
  81. Path(file_path).unlink()
  82. volume.commit()
  83. return True
  84. except FileNotFoundError:
  85. return False
  86. def list_recurring_schedules_for_user(api_key: str) -> List[dict]:
  87. """List all recurring schedules for a specific user."""
  88. api_key_hash = get_api_key_hash(api_key)
  89. user_dir = f"{SCHEDULES_BASE}/recurring/{api_key_hash}"
  90. schedules = []
  91. if not Path(user_dir).exists():
  92. return schedules
  93. for file_path in Path(user_dir).glob("*.json"):
  94. try:
  95. with open(file_path, "rb") as f:
  96. encrypted_data = f.read()
  97. schedule = decrypt_json(encrypted_data, get_encryption_key_cached())
  98. schedules.append(schedule)
  99. except Exception as e:
  100. logger.error(f"Failed to load schedule {file_path}: {e}")
  101. return schedules
  102. def list_onetime_schedules_for_user(api_key: str) -> List[dict]:
  103. """List all one-time schedules for a specific user."""
  104. api_key_hash = get_api_key_hash(api_key)
  105. base_dir = f"{SCHEDULES_BASE}/one-time"
  106. schedules = []
  107. if not Path(base_dir).exists():
  108. return schedules
  109. # Traverse all date/hour buckets
  110. for date_dir in Path(base_dir).iterdir():
  111. if not date_dir.is_dir():
  112. continue
  113. for hour_dir in date_dir.iterdir():
  114. if not hour_dir.is_dir():
  115. continue
  116. user_dir = hour_dir / api_key_hash
  117. if not user_dir.exists():
  118. continue
  119. for file_path in user_dir.glob("*.json"):
  120. try:
  121. with open(file_path, "rb") as f:
  122. encrypted_data = f.read()
  123. schedule = decrypt_json(encrypted_data, get_encryption_key_cached())
  124. schedules.append(schedule)
  125. except Exception as e:
  126. logger.error(f"Failed to load schedule {file_path}: {e}")
  127. return schedules
  128. def list_all_recurring_schedules() -> List[dict]:
  129. """List all recurring schedules across all users (for cron job)."""
  130. schedules = []
  131. recurring_dir = f"{SCHEDULES_BASE}/recurring"
  132. if not Path(recurring_dir).exists():
  133. return schedules
  134. for user_dir in Path(recurring_dir).iterdir():
  135. if not user_dir.is_dir():
  136. continue
  137. for file_path in user_dir.glob("*.json"):
  138. try:
  139. with open(file_path, "rb") as f:
  140. encrypted_data = f.read()
  141. schedule = decrypt_json(encrypted_data, get_encryption_key_cached())
  142. schedules.append(schedule)
  143. except Exception as e:
  144. logger.error(f"Failed to load schedule {file_path}: {e}")
  145. return schedules
  146. def list_onetime_schedules_for_time(date_str: str, hour_str: str) -> List[dict]:
  147. """List all one-time schedules for a specific date/hour (for cron job)."""
  148. schedules = []
  149. time_dir = f"{SCHEDULES_BASE}/one-time/{date_str}/{hour_str}"
  150. if not Path(time_dir).exists():
  151. return schedules
  152. for user_dir in Path(time_dir).iterdir():
  153. if not user_dir.is_dir():
  154. continue
  155. for file_path in user_dir.glob("*.json"):
  156. try:
  157. with open(file_path, "rb") as f:
  158. encrypted_data = f.read()
  159. schedule = decrypt_json(encrypted_data, get_encryption_key_cached())
  160. schedules.append(schedule)
  161. except Exception as e:
  162. logger.error(f"Failed to load schedule {file_path}: {e}")
  163. return schedules
  164. def find_onetime_schedule_for_user(api_key: str, schedule_id: str) -> tuple[dict, str]:
  165. """Find a one-time schedule by ID for a specific user. Returns (schedule, file_path)."""
  166. api_key_hash = get_api_key_hash(api_key)
  167. base_dir = f"{SCHEDULES_BASE}/one-time"
  168. if not Path(base_dir).exists():
  169. return None, None
  170. # Search through all time buckets for this user
  171. for date_dir in Path(base_dir).iterdir():
  172. if not date_dir.is_dir():
  173. continue
  174. for hour_dir in date_dir.iterdir():
  175. if not hour_dir.is_dir():
  176. continue
  177. user_dir = hour_dir / api_key_hash
  178. if not user_dir.exists():
  179. continue
  180. file_path = user_dir / f"{schedule_id}.json"
  181. if file_path.exists():
  182. try:
  183. with open(file_path, "rb") as f:
  184. encrypted_data = f.read()
  185. schedule = decrypt_json(encrypted_data, get_encryption_key_cached())
  186. return schedule, str(file_path)
  187. except Exception as e:
  188. logger.error(f"Failed to load schedule {file_path}: {e}")
  189. return None, None
  190. @web_app.get("/")
  191. async def root(request: Request):
  192. """Landing page with usage instructions."""
  193. # Check if browser is requesting HTML
  194. accept_header = request.headers.get("accept", "")
  195. wants_html = "text/html" in accept_header
  196. info = {
  197. "service": "Letta Switchboard",
  198. "description": "Free hosted message routing service for Letta agents",
  199. "version": "1.0.0",
  200. "documentation": "https://github.com/cpfiffer/letta-switchboard",
  201. "features": [
  202. "Send messages immediately or scheduled for later",
  203. "Recurring schedules with cron expressions",
  204. "Secure API key isolation",
  205. "Execution tracking with run IDs"
  206. ],
  207. "quick_start": {
  208. "curl_example": {
  209. "one_time": "curl -X POST https://letta--switchboard-api.modal.run/schedules/one-time -H 'Authorization: Bearer YOUR_LETTA_API_KEY' -H 'Content-Type: application/json' -d '{\"agent_id\": \"agent-xxx\", \"execute_at\": \"2025-11-13T09:00:00Z\", \"message\": \"Hello!\"}'",
  210. "recurring": "curl -X POST https://letta--switchboard-api.modal.run/schedules/recurring -H 'Authorization: Bearer YOUR_LETTA_API_KEY' -H 'Content-Type: application/json' -d '{\"agent_id\": \"agent-xxx\", \"cron\": \"0 9 * * 1-5\", \"message\": \"Daily standup\"}'"
  211. },
  212. "cli": {
  213. "install": "git clone https://github.com/cpfiffer/letta-switchboard.git && cd letta-switchboard/cli && go build -o letta-switchboard",
  214. "configure": "./letta-switchboard config set-api-key YOUR_LETTA_API_KEY",
  215. "send": "./letta-switchboard send --agent-id agent-xxx --message 'Hello!'",
  216. "schedule": "./letta-switchboard send --agent-id agent-xxx --message 'Reminder' --execute-at 'tomorrow at 9am'",
  217. "recurring": "./letta-switchboard recurring create --agent-id agent-xxx --message 'Daily standup' --cron 'every weekday'"
  218. }
  219. },
  220. "endpoints": {
  221. "POST /schedules/one-time": "Create a one-time schedule",
  222. "POST /schedules/recurring": "Create a recurring schedule",
  223. "GET /schedules/one-time": "List your one-time schedules",
  224. "GET /schedules/recurring": "List your recurring schedules",
  225. "GET /schedules/one-time/{id}": "Get specific one-time schedule",
  226. "GET /schedules/recurring/{id}": "Get specific recurring schedule",
  227. "DELETE /schedules/one-time/{id}": "Delete one-time schedule",
  228. "DELETE /schedules/recurring/{id}": "Delete recurring schedule",
  229. "GET /results": "List execution results",
  230. "GET /results/{schedule_id}": "Get result for specific schedule"
  231. },
  232. "authentication": "All endpoints require 'Authorization: Bearer YOUR_LETTA_API_KEY' header",
  233. "support": "https://github.com/cpfiffer/letta-switchboard/issues"
  234. }
  235. if wants_html:
  236. html_content = """
  237. <!DOCTYPE html>
  238. <html>
  239. <head>
  240. <title>Letta Switchboard</title>
  241. <meta charset="UTF-8">
  242. <meta name="viewport" content="width=device-width, initial-scale=1.0">
  243. <style>
  244. body {
  245. font-family: 'Courier New', 'Consolas', monospace;
  246. background: #f5f5dc;
  247. margin: 0;
  248. padding: 15px;
  249. line-height: 1.4;
  250. font-weight: 600;
  251. }
  252. .container {
  253. max-width: 900px;
  254. margin: 0 auto;
  255. background: white;
  256. padding: 30px;
  257. box-shadow: 0 0 20px rgba(0,0,0,0.1);
  258. }
  259. h1 {
  260. color: #000;
  261. margin-bottom: 8px;
  262. font-size: 24px;
  263. letter-spacing: 3px;
  264. font-weight: 900;
  265. text-transform: uppercase;
  266. border-bottom: 3px solid #000;
  267. padding-bottom: 8px;
  268. }
  269. h2 {
  270. color: #000;
  271. margin: 20px 0 10px;
  272. font-size: 16px;
  273. letter-spacing: 2px;
  274. font-weight: 900;
  275. text-transform: uppercase;
  276. border-bottom: 3px solid #000;
  277. padding-bottom: 3px;
  278. }
  279. .info-box {
  280. border: 3px solid #000;
  281. padding: 12px;
  282. margin: 15px 0;
  283. background: #fafafa;
  284. }
  285. .info-box pre {
  286. font-family: 'Courier New', monospace;
  287. font-size: 13px;
  288. line-height: 1.6;
  289. margin: 0;
  290. font-weight: 700;
  291. }
  292. .feature-list {
  293. list-style: none;
  294. padding: 0;
  295. margin: 10px 0;
  296. }
  297. .feature-list li {
  298. padding: 4px 0;
  299. font-weight: 600;
  300. }
  301. .feature-list li:before {
  302. content: "▪ ";
  303. color: #000;
  304. font-weight: 900;
  305. }
  306. code {
  307. background: #f0f0f0;
  308. padding: 2px 6px;
  309. border: 1px solid #000;
  310. font-weight: 700;
  311. }
  312. pre {
  313. background: #fafafa;
  314. border: 2px solid #000;
  315. padding: 12px;
  316. overflow-x: auto;
  317. margin: 10px 0;
  318. font-weight: 600;
  319. }
  320. .endpoint {
  321. background: #fafafa;
  322. padding: 8px 12px;
  323. margin: 6px 0;
  324. border: 2px solid #000;
  325. font-weight: 700;
  326. }
  327. a {
  328. color: #000;
  329. text-decoration: underline;
  330. font-weight: 900;
  331. }
  332. a:hover {
  333. background: #000;
  334. color: white;
  335. }
  336. .note {
  337. background: #fafafa;
  338. border: 3px solid #000;
  339. padding: 12px;
  340. margin: 15px 0;
  341. font-weight: 700;
  342. }
  343. </style>
  344. </head>
  345. <body>
  346. <div class="container">
  347. <h1>LETTA SWITCHBOARD</h1>
  348. <div class="info-box">
  349. <pre>SERVICE: MESSAGE ROUTING FOR LETTA AGENTS
  350. VERSION: 1.0.0
  351. STATUS: OPERATIONAL
  352. HOSTING: FREE SERVERLESS DEPLOYMENT</pre>
  353. </div>
  354. <h2>FEATURES</h2>
  355. <ul class="feature-list">
  356. <li>IMMEDIATE OR SCHEDULED MESSAGE DELIVERY</li>
  357. <li>RECURRING SCHEDULES WITH CRON EXPRESSIONS</li>
  358. <li>SECURE API KEY ISOLATION</li>
  359. <li>EXECUTION TRACKING WITH RUN IDS</li>
  360. </ul>
  361. <h2>QUICK START</h2>
  362. <div class="example">
  363. <p style="font-weight:900;margin:10px 0 6px 0;">SEND ONE-TIME MESSAGE:</p>
  364. <pre>curl -X POST https://letta--switchboard-api.modal.run/schedules/one-time \\
  365. -H 'Authorization: Bearer YOUR_LETTA_API_KEY' \\
  366. -H 'Content-Type: application/json' \\
  367. -d '{
  368. "agent_id": "agent-xxx",
  369. "execute_at": "2025-11-13T09:00:00Z",
  370. "message": "Hello from Switchboard!"
  371. }'</pre>
  372. </div>
  373. <div class="example">
  374. <p style="font-weight:900;margin:10px 0 6px 0;">CREATE RECURRING SCHEDULE:</p>
  375. <pre>curl -X POST https://letta--switchboard-api.modal.run/schedules/recurring \\
  376. -H 'Authorization: Bearer YOUR_LETTA_API_KEY' \\
  377. -H 'Content-Type: application/json' \\
  378. -d '{
  379. "agent_id": "agent-xxx",
  380. "cron": "0 9 * * 1-5",
  381. "message": "Daily standup reminder"
  382. }'</pre>
  383. </div>
  384. <h2>CLI TOOL</h2>
  385. <p style="margin:8px 0;font-weight:700;">NATURAL LANGUAGE SCHEDULING SUPPORT</p>
  386. <div class="example">
  387. <p style="font-weight:900;margin:10px 0 6px 0;">INSTALLATION:</p>
  388. <pre>git clone https://github.com/cpfiffer/letta-switchboard.git
  389. cd letta-switchboard/cli
  390. go build -o letta-switchboard
  391. ./letta-switchboard config set-api-key YOUR_LETTA_API_KEY</pre>
  392. </div>
  393. <div class="example">
  394. <p style="font-weight:900;margin:10px 0 6px 0;">USAGE:</p>
  395. <pre># Send immediately
  396. ./letta-switchboard send --agent-id agent-xxx --message "Hello!"
  397. # Schedule with natural language
  398. ./letta-switchboard send --agent-id agent-xxx --message "Reminder" --execute-at "tomorrow at 9am"
  399. # Recurring schedule
  400. ./letta-switchboard recurring create --agent-id agent-xxx --message "Daily standup" --cron "every weekday"</pre>
  401. </div>
  402. <h2>API ENDPOINTS</h2>
  403. <div class="endpoint"><code>POST /schedules/one-time</code> - Create a one-time schedule</div>
  404. <div class="endpoint"><code>POST /schedules/recurring</code> - Create a recurring schedule</div>
  405. <div class="endpoint"><code>GET /schedules/one-time</code> - List your one-time schedules</div>
  406. <div class="endpoint"><code>GET /schedules/recurring</code> - List your recurring schedules</div>
  407. <div class="endpoint"><code>GET /schedules/one-time/{id}</code> - Get specific one-time schedule</div>
  408. <div class="endpoint"><code>GET /schedules/recurring/{id}</code> - Get specific recurring schedule</div>
  409. <div class="endpoint"><code>DELETE /schedules/one-time/{id}</code> - Delete one-time schedule</div>
  410. <div class="endpoint"><code>DELETE /schedules/recurring/{id}</code> - Delete recurring schedule</div>
  411. <div class="endpoint"><code>GET /results</code> - List execution results</div>
  412. <div class="endpoint"><code>GET /results/{schedule_id}</code> - Get result for specific schedule</div>
  413. <div class="note">
  414. <strong>AUTHENTICATION:</strong> ALL ENDPOINTS REQUIRE<br>
  415. <code>Authorization: Bearer YOUR_LETTA_API_KEY</code>
  416. </div>
  417. <h2>DOCUMENTATION & SUPPORT</h2>
  418. <p style="margin: 8px 0;"><a href="/dashboard">WEB DASHBOARD</a> - Manage schedules in browser</p>
  419. <p style="margin: 8px 0;"><a href="https://github.com/cpfiffer/letta-switchboard">DOCUMENTATION</a> - Full technical reference</p>
  420. <p style="margin: 8px 0;"><a href="https://github.com/cpfiffer/letta-switchboard/issues">SUPPORT</a> - Issue tracker</p>
  421. <p style="margin: 8px 0;"><a href="/?json">JSON API</a> - View as JSON response</p>
  422. </div>
  423. </body>
  424. </html>
  425. """
  426. return HTMLResponse(content=html_content)
  427. return info
  428. @web_app.get("/dashboard")
  429. async def dashboard():
  430. """Dashboard UI for managing schedules."""
  431. try:
  432. with open("/root/dashboard.html", "r") as f:
  433. html_content = f.read()
  434. return HTMLResponse(content=html_content)
  435. except FileNotFoundError:
  436. raise HTTPException(status_code=404, detail="Dashboard not found")
  437. @web_app.post("/schedules/recurring")
  438. async def create_recurring_schedule(schedule: RecurringScheduleCreate, credentials: HTTPAuthorizationCredentials = Security(security)):
  439. api_key = credentials.credentials
  440. if not validate_api_key(api_key):
  441. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  442. # Inject api_key from Authorization header
  443. schedule_obj = RecurringSchedule(api_key=api_key, **schedule.model_dump())
  444. schedule_dict = schedule_obj.model_dump(mode='json')
  445. file_path = get_recurring_schedule_path(api_key, schedule_obj.id)
  446. save_schedule(file_path, schedule_dict)
  447. response_dict = schedule_dict.copy()
  448. response_dict.pop("api_key", None)
  449. return JSONResponse(content=response_dict, status_code=201)
  450. @web_app.post("/schedules/one-time")
  451. async def create_onetime_schedule(schedule: OneTimeScheduleCreate, credentials: HTTPAuthorizationCredentials = Security(security)):
  452. api_key = credentials.credentials
  453. if not validate_api_key(api_key):
  454. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  455. # Inject api_key from Authorization header
  456. schedule_obj = OneTimeSchedule(api_key=api_key, **schedule.model_dump())
  457. schedule_dict = schedule_obj.model_dump(mode='json')
  458. file_path = get_onetime_schedule_path(api_key, schedule.execute_at, schedule_obj.id)
  459. save_schedule(file_path, schedule_dict)
  460. response_dict = schedule_dict.copy()
  461. response_dict.pop("api_key", None)
  462. return JSONResponse(content=response_dict, status_code=201)
  463. @web_app.get("/schedules/recurring")
  464. async def list_recurring_schedules(credentials: HTTPAuthorizationCredentials = Security(security)):
  465. api_key = credentials.credentials
  466. if not validate_api_key(api_key):
  467. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  468. schedules = list_recurring_schedules_for_user(api_key)
  469. for schedule in schedules:
  470. schedule.pop("api_key", None)
  471. return JSONResponse(content=schedules)
  472. @web_app.get("/schedules/one-time")
  473. async def list_onetime_schedules(credentials: HTTPAuthorizationCredentials = Security(security)):
  474. api_key = credentials.credentials
  475. if not validate_api_key(api_key):
  476. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  477. schedules = list_onetime_schedules_for_user(api_key)
  478. for schedule in schedules:
  479. schedule.pop("api_key", None)
  480. return JSONResponse(content=schedules)
  481. @web_app.get("/schedules/recurring/{schedule_id}")
  482. async def get_recurring_schedule(schedule_id: str, credentials: HTTPAuthorizationCredentials = Security(security)):
  483. api_key = credentials.credentials
  484. if not validate_api_key(api_key):
  485. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  486. file_path = get_recurring_schedule_path(api_key, schedule_id)
  487. schedule = load_schedule(file_path)
  488. if schedule is None:
  489. raise HTTPException(status_code=404, detail="Schedule not found")
  490. if schedule.get("api_key") != api_key:
  491. raise HTTPException(status_code=403, detail="Forbidden")
  492. schedule_copy = schedule.copy()
  493. schedule_copy.pop("api_key", None)
  494. return JSONResponse(content=schedule_copy)
  495. @web_app.get("/schedules/one-time/{schedule_id}")
  496. async def get_onetime_schedule(schedule_id: str, credentials: HTTPAuthorizationCredentials = Security(security)):
  497. api_key = credentials.credentials
  498. if not validate_api_key(api_key):
  499. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  500. schedule, _ = find_onetime_schedule_for_user(api_key, schedule_id)
  501. if schedule is None:
  502. raise HTTPException(status_code=404, detail="Schedule not found")
  503. if schedule.get("api_key") != api_key:
  504. raise HTTPException(status_code=403, detail="Forbidden")
  505. schedule_copy = schedule.copy()
  506. schedule_copy.pop("api_key", None)
  507. return JSONResponse(content=schedule_copy)
  508. @web_app.delete("/schedules/recurring/{schedule_id}")
  509. async def delete_recurring_schedule(schedule_id: str, credentials: HTTPAuthorizationCredentials = Security(security)):
  510. api_key = credentials.credentials
  511. if not validate_api_key(api_key):
  512. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  513. file_path = get_recurring_schedule_path(api_key, schedule_id)
  514. schedule = load_schedule(file_path)
  515. if schedule is None:
  516. raise HTTPException(status_code=404, detail="Schedule not found")
  517. if schedule.get("api_key") != api_key:
  518. raise HTTPException(status_code=403, detail="Forbidden")
  519. if delete_schedule(file_path):
  520. return JSONResponse(content={"message": "Schedule deleted"})
  521. else:
  522. raise HTTPException(status_code=404, detail="Schedule not found")
  523. @web_app.delete("/schedules/one-time/{schedule_id}")
  524. async def delete_onetime_schedule(schedule_id: str, credentials: HTTPAuthorizationCredentials = Security(security)):
  525. api_key = credentials.credentials
  526. if not validate_api_key(api_key):
  527. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  528. schedule, file_path = find_onetime_schedule_for_user(api_key, schedule_id)
  529. if schedule is None:
  530. raise HTTPException(status_code=404, detail="Schedule not found")
  531. if schedule.get("api_key") != api_key:
  532. raise HTTPException(status_code=403, detail="Forbidden")
  533. if delete_schedule(file_path):
  534. return JSONResponse(content={"message": "Schedule deleted"})
  535. else:
  536. raise HTTPException(status_code=404, detail="Schedule not found")
  537. @web_app.get("/results")
  538. async def list_execution_results(credentials: HTTPAuthorizationCredentials = Security(security)):
  539. """List all execution results for the authenticated user."""
  540. api_key = credentials.credentials
  541. if not validate_api_key(api_key):
  542. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  543. api_key_hash = get_api_key_hash(api_key)
  544. results_dir = f"{RESULTS_BASE}/{api_key_hash}"
  545. results = []
  546. if Path(results_dir).exists():
  547. for result_file in Path(results_dir).glob("*.json"):
  548. try:
  549. with open(result_file, "rb") as f:
  550. encrypted_data = f.read()
  551. result = decrypt_json(encrypted_data, get_encryption_key_cached())
  552. results.append(result)
  553. except Exception as e:
  554. logger.error(f"Failed to load result {result_file}: {e}")
  555. return JSONResponse(content=results)
  556. @web_app.get("/results/{schedule_id}")
  557. async def get_execution_result(schedule_id: str, credentials: HTTPAuthorizationCredentials = Security(security)):
  558. """Get execution result for a specific schedule."""
  559. api_key = credentials.credentials
  560. if not validate_api_key(api_key):
  561. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  562. api_key_hash = get_api_key_hash(api_key)
  563. result_file = f"{RESULTS_BASE}/{api_key_hash}/{schedule_id}.json"
  564. if not Path(result_file).exists():
  565. raise HTTPException(status_code=404, detail="Result not found")
  566. try:
  567. with open(result_file, "rb") as f:
  568. encrypted_data = f.read()
  569. result = decrypt_json(encrypted_data, get_encryption_key_cached())
  570. return JSONResponse(content=result)
  571. except Exception as e:
  572. logger.error(f"Failed to load result: {e}")
  573. raise HTTPException(status_code=500, detail="Failed to load result")
  574. @app.function(image=image, volumes={VOLUME_PATH: volume}, secrets=[encryption_secret] if encryption_secret else [])
  575. @modal.asgi_app()
  576. def api():
  577. return web_app
  578. @app.function(image=image, volumes={VOLUME_PATH: volume}, secrets=[encryption_secret] if encryption_secret else [])
  579. async def execute_schedule(
  580. schedule_id: str,
  581. agent_id: str,
  582. api_key: str,
  583. message: str,
  584. role: str,
  585. schedule_type: str,
  586. execute_at: str = None,
  587. ):
  588. logger.info(f"Executing {schedule_type} schedule {schedule_id} for agent {agent_id}")
  589. # Reload volume to see latest changes (in case schedule was deleted)
  590. volume.reload()
  591. # Check if schedule still exists before executing
  592. if schedule_type == "one-time" and execute_at:
  593. file_path = get_onetime_schedule_path(api_key, execute_at, schedule_id)
  594. else:
  595. file_path = get_recurring_schedule_path(api_key, schedule_id)
  596. schedule = load_schedule(file_path)
  597. if not schedule:
  598. # For one-time schedules, this is expected (already executed or deleted)
  599. # For recurring schedules, this indicates the schedule was deleted by the user
  600. if schedule_type == "one-time":
  601. logger.debug(f"One-time schedule {schedule_id} not found (already executed or deleted)")
  602. else:
  603. logger.info(f"Recurring schedule {schedule_id} was deleted, skipping execution")
  604. return {"success": False, "error": "Schedule deleted"}
  605. # For one-time schedules: DELETE IMMEDIATELY to prevent race condition
  606. # Filesystem becomes source of truth: if file exists, it hasn't run
  607. if schedule_type == "one-time":
  608. try:
  609. Path(file_path).unlink()
  610. volume.commit()
  611. logger.info(f"Deleted one-time schedule {schedule_id} to prevent re-execution")
  612. except Exception as e:
  613. logger.error(f"Failed to delete schedule {schedule_id}, may re-execute: {e}")
  614. return {"success": False, "error": "Could not lock schedule for execution"}
  615. # For recurring schedules: update last_run timestamp
  616. elif schedule_type == "recurring":
  617. schedule["last_run"] = datetime.utcnow().isoformat()
  618. save_schedule(file_path, schedule)
  619. logger.info(f"Updated last_run for recurring schedule {schedule_id}")
  620. # Execute the message
  621. result = await execute_letta_message(agent_id, api_key, message, role)
  622. # Always save execution result (success or failure)
  623. if result.get("success"):
  624. # Successful execution
  625. save_execution_result(
  626. api_key=api_key,
  627. schedule_id=schedule_id,
  628. schedule_type=schedule_type,
  629. agent_id=agent_id,
  630. message=message,
  631. run_id=result.get("run_id"),
  632. status="success"
  633. )
  634. else:
  635. # Failed execution - save error result
  636. error_msg = result.get("error", "Unknown error")
  637. logger.error(f"Execution failed for schedule {schedule_id}: {error_msg}")
  638. save_execution_result(
  639. api_key=api_key,
  640. schedule_id=schedule_id,
  641. schedule_type=schedule_type,
  642. agent_id=agent_id,
  643. message=message,
  644. error=error_msg,
  645. status="failed"
  646. )
  647. # Terminate recurring schedules on failure (no retries)
  648. if schedule_type == "recurring":
  649. try:
  650. Path(file_path).unlink()
  651. volume.commit()
  652. logger.warning(f"Terminated recurring schedule {schedule_id} due to execution failure: {error_msg}")
  653. except Exception as e:
  654. logger.error(f"Failed to delete failed recurring schedule {schedule_id}: {e}")
  655. return result
  656. def save_execution_result(api_key: str, schedule_id: str, schedule_type: str, agent_id: str, message: str, run_id: str = None, error: str = None, status: str = "success"):
  657. """Save execution result to results folder (success or failure)."""
  658. api_key_hash = get_api_key_hash(api_key)
  659. result_dir = f"{RESULTS_BASE}/{api_key_hash}"
  660. Path(result_dir).mkdir(parents=True, exist_ok=True)
  661. result_file = f"{result_dir}/{schedule_id}.json"
  662. result_data = {
  663. "schedule_id": schedule_id,
  664. "schedule_type": schedule_type,
  665. "status": status,
  666. "agent_id": agent_id,
  667. "message": message,
  668. "executed_at": datetime.utcnow().isoformat(),
  669. }
  670. # Add run_id only if present (successful execution)
  671. if run_id:
  672. result_data["run_id"] = run_id
  673. # Add error only if present (failed execution)
  674. if error:
  675. result_data["error"] = error
  676. encrypted_data = encrypt_json(result_data, get_encryption_key_cached())
  677. with open(result_file, "wb") as f:
  678. f.write(encrypted_data)
  679. volume.commit()
  680. logger.info(f"Saved execution result for schedule {schedule_id}, run_id: {run_id}")
  681. def cleanup_empty_directories():
  682. """Remove empty directories to keep filesystem clean."""
  683. removed_count = 0
  684. # Clean up one-time schedule directories (date/hour/user structure)
  685. onetime_base = f"{SCHEDULES_BASE}/one-time"
  686. if Path(onetime_base).exists():
  687. for date_dir in Path(onetime_base).iterdir():
  688. if not date_dir.is_dir():
  689. continue
  690. for hour_dir in date_dir.iterdir():
  691. if not hour_dir.is_dir():
  692. continue
  693. # Remove empty user directories
  694. for user_dir in hour_dir.iterdir():
  695. if user_dir.is_dir() and not any(user_dir.iterdir()):
  696. user_dir.rmdir()
  697. removed_count += 1
  698. logger.debug(f"Removed empty directory: {user_dir}")
  699. # Remove empty hour directory
  700. if not any(hour_dir.iterdir()):
  701. hour_dir.rmdir()
  702. removed_count += 1
  703. logger.debug(f"Removed empty directory: {hour_dir}")
  704. # Remove empty date directory
  705. if not any(date_dir.iterdir()):
  706. date_dir.rmdir()
  707. removed_count += 1
  708. logger.debug(f"Removed empty directory: {date_dir}")
  709. # Clean up recurring schedule directories (user structure only)
  710. recurring_base = f"{SCHEDULES_BASE}/recurring"
  711. if Path(recurring_base).exists():
  712. for user_dir in Path(recurring_base).iterdir():
  713. if user_dir.is_dir() and not any(user_dir.iterdir()):
  714. user_dir.rmdir()
  715. removed_count += 1
  716. logger.debug(f"Removed empty directory: {user_dir}")
  717. if removed_count > 0:
  718. logger.info(f"Cleanup: Removed {removed_count} empty directories")
  719. volume.commit()
  720. @app.function(
  721. image=image,
  722. volumes={VOLUME_PATH: volume},
  723. secrets=[encryption_secret] if encryption_secret else [],
  724. schedule=modal.Cron("* * * * *"),
  725. )
  726. async def check_and_execute_schedules():
  727. logger.info("Checking schedules...")
  728. # Reload volume to see latest changes (deletes, updates, etc.)
  729. volume.reload()
  730. current_time = datetime.now(timezone.utc)
  731. # Check recurring schedules (all users)
  732. recurring_schedules = list_all_recurring_schedules()
  733. for schedule in recurring_schedules:
  734. if is_recurring_schedule_due(schedule, current_time):
  735. logger.info(f"Executing recurring schedule {schedule['id']}")
  736. execute_schedule.spawn(
  737. schedule_id=schedule["id"],
  738. agent_id=schedule["agent_id"],
  739. api_key=schedule["api_key"],
  740. message=schedule["message"],
  741. role=schedule["role"],
  742. schedule_type="recurring",
  743. )
  744. # Check one-time schedules (only current date/hour bucket)
  745. date_str = current_time.strftime("%Y-%m-%d")
  746. hour_str = current_time.strftime("%H")
  747. onetime_schedules = list_onetime_schedules_for_time(date_str, hour_str)
  748. for schedule in onetime_schedules:
  749. if is_onetime_schedule_due(schedule, current_time):
  750. logger.info(f"Executing one-time schedule {schedule['id']}")
  751. execute_schedule.spawn(
  752. schedule_id=schedule["id"],
  753. agent_id=schedule["agent_id"],
  754. api_key=schedule["api_key"],
  755. message=schedule["message"],
  756. role=schedule["role"],
  757. schedule_type="one-time",
  758. execute_at=schedule["execute_at"],
  759. )
  760. # Clean up empty directories
  761. cleanup_empty_directories()
  762. logger.info("Schedule check complete")