app.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649
  1. import modal
  2. import json
  3. import logging
  4. from datetime import datetime, timezone
  5. from pathlib import Path
  6. from typing import List
  7. from fastapi import FastAPI, HTTPException, Header, Security
  8. from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
  9. from fastapi.responses import JSONResponse
  10. from typing import Optional
  11. security = HTTPBearer()
  12. from models import (
  13. RecurringScheduleCreate,
  14. RecurringSchedule,
  15. OneTimeScheduleCreate,
  16. OneTimeSchedule,
  17. )
  18. from scheduler import is_recurring_schedule_due, is_onetime_schedule_due
  19. from letta_executor import execute_letta_message, validate_api_key
  20. from crypto_utils import get_api_key_hash, get_encryption_key, encrypt_json, decrypt_json
  21. from dateutil import parser as date_parser
  22. logging.basicConfig(level=logging.INFO)
  23. logger = logging.getLogger(__name__)
  24. app = modal.App("switchboard")
  25. import os as local_os
  26. # Read dev mode setting from local environment
  27. dev_mode_enabled = local_os.getenv("LETTA_SWITCHBOARD_DEV_MODE", "false")
  28. image = (
  29. modal.Image.debian_slim()
  30. .pip_install_from_requirements("requirements.txt")
  31. .env({"LETTA_SWITCHBOARD_DEV_MODE": dev_mode_enabled}) # Must come before add_local_*
  32. .add_local_python_source("models", "scheduler", "letta_executor", "crypto_utils")
  33. )
  34. volume = modal.Volume.from_name("letta-switchboard-volume", create_if_missing=True)
  35. try:
  36. encryption_secret = modal.Secret.from_name("letta-switchboard-encryption")
  37. except Exception:
  38. logger.warning("letta-switchboard-encryption secret not found, will use env var or generate temporary key")
  39. encryption_secret = None
  40. VOLUME_PATH = "/data"
  41. SCHEDULES_BASE = f"{VOLUME_PATH}/schedules"
  42. RESULTS_BASE = f"{VOLUME_PATH}/results"
  43. web_app = FastAPI()
  44. # Lazy-load encryption key (will check env vars at runtime)
  45. _encryption_key = None
  46. def get_encryption_key_cached():
  47. global _encryption_key
  48. if _encryption_key is None:
  49. _encryption_key = get_encryption_key()
  50. return _encryption_key
  51. def get_recurring_schedule_path(api_key: str, schedule_id: str) -> str:
  52. """Get file path for recurring schedule."""
  53. api_key_hash = get_api_key_hash(api_key)
  54. return f"{SCHEDULES_BASE}/recurring/{api_key_hash}/{schedule_id}.json"
  55. def get_onetime_schedule_path(api_key: str, execute_at: str, schedule_id: str) -> str:
  56. """Get file path for one-time schedule with time bucketing."""
  57. api_key_hash = get_api_key_hash(api_key)
  58. dt = date_parser.parse(execute_at)
  59. date_str = dt.strftime("%Y-%m-%d")
  60. hour_str = dt.strftime("%H")
  61. return f"{SCHEDULES_BASE}/one-time/{date_str}/{hour_str}/{api_key_hash}/{schedule_id}.json"
  62. def save_schedule(file_path: str, schedule_data: dict):
  63. """Save encrypted schedule to file."""
  64. Path(file_path).parent.mkdir(parents=True, exist_ok=True)
  65. encrypted_data = encrypt_json(schedule_data, get_encryption_key_cached())
  66. with open(file_path, "wb") as f:
  67. f.write(encrypted_data)
  68. volume.commit()
  69. def load_schedule(file_path: str) -> dict:
  70. """Load and decrypt schedule from file."""
  71. try:
  72. with open(file_path, "rb") as f:
  73. encrypted_data = f.read()
  74. return decrypt_json(encrypted_data, get_encryption_key_cached())
  75. except FileNotFoundError:
  76. return None
  77. def delete_schedule(file_path: str):
  78. """Delete schedule file."""
  79. try:
  80. Path(file_path).unlink()
  81. volume.commit()
  82. return True
  83. except FileNotFoundError:
  84. return False
  85. def list_recurring_schedules_for_user(api_key: str) -> List[dict]:
  86. """List all recurring schedules for a specific user."""
  87. api_key_hash = get_api_key_hash(api_key)
  88. user_dir = f"{SCHEDULES_BASE}/recurring/{api_key_hash}"
  89. schedules = []
  90. if not Path(user_dir).exists():
  91. return schedules
  92. for file_path in Path(user_dir).glob("*.json"):
  93. try:
  94. with open(file_path, "rb") as f:
  95. encrypted_data = f.read()
  96. schedule = decrypt_json(encrypted_data, get_encryption_key_cached())
  97. schedules.append(schedule)
  98. except Exception as e:
  99. logger.error(f"Failed to load schedule {file_path}: {e}")
  100. return schedules
  101. def list_onetime_schedules_for_user(api_key: str) -> List[dict]:
  102. """List all one-time schedules for a specific user."""
  103. api_key_hash = get_api_key_hash(api_key)
  104. base_dir = f"{SCHEDULES_BASE}/one-time"
  105. schedules = []
  106. if not Path(base_dir).exists():
  107. return schedules
  108. # Traverse all date/hour buckets
  109. for date_dir in Path(base_dir).iterdir():
  110. if not date_dir.is_dir():
  111. continue
  112. for hour_dir in date_dir.iterdir():
  113. if not hour_dir.is_dir():
  114. continue
  115. user_dir = hour_dir / api_key_hash
  116. if not user_dir.exists():
  117. continue
  118. for file_path in user_dir.glob("*.json"):
  119. try:
  120. with open(file_path, "rb") as f:
  121. encrypted_data = f.read()
  122. schedule = decrypt_json(encrypted_data, get_encryption_key_cached())
  123. schedules.append(schedule)
  124. except Exception as e:
  125. logger.error(f"Failed to load schedule {file_path}: {e}")
  126. return schedules
  127. def list_all_recurring_schedules() -> List[dict]:
  128. """List all recurring schedules across all users (for cron job)."""
  129. schedules = []
  130. recurring_dir = f"{SCHEDULES_BASE}/recurring"
  131. if not Path(recurring_dir).exists():
  132. return schedules
  133. for user_dir in Path(recurring_dir).iterdir():
  134. if not user_dir.is_dir():
  135. continue
  136. for file_path in user_dir.glob("*.json"):
  137. try:
  138. with open(file_path, "rb") as f:
  139. encrypted_data = f.read()
  140. schedule = decrypt_json(encrypted_data, get_encryption_key_cached())
  141. schedules.append(schedule)
  142. except Exception as e:
  143. logger.error(f"Failed to load schedule {file_path}: {e}")
  144. return schedules
  145. def list_onetime_schedules_for_time(date_str: str, hour_str: str) -> List[dict]:
  146. """List all one-time schedules for a specific date/hour (for cron job)."""
  147. schedules = []
  148. time_dir = f"{SCHEDULES_BASE}/one-time/{date_str}/{hour_str}"
  149. if not Path(time_dir).exists():
  150. return schedules
  151. for user_dir in Path(time_dir).iterdir():
  152. if not user_dir.is_dir():
  153. continue
  154. for file_path in user_dir.glob("*.json"):
  155. try:
  156. with open(file_path, "rb") as f:
  157. encrypted_data = f.read()
  158. schedule = decrypt_json(encrypted_data, get_encryption_key_cached())
  159. schedules.append(schedule)
  160. except Exception as e:
  161. logger.error(f"Failed to load schedule {file_path}: {e}")
  162. return schedules
  163. def find_onetime_schedule_for_user(api_key: str, schedule_id: str) -> tuple[dict, str]:
  164. """Find a one-time schedule by ID for a specific user. Returns (schedule, file_path)."""
  165. api_key_hash = get_api_key_hash(api_key)
  166. base_dir = f"{SCHEDULES_BASE}/one-time"
  167. if not Path(base_dir).exists():
  168. return None, None
  169. # Search through all time buckets for this user
  170. for date_dir in Path(base_dir).iterdir():
  171. if not date_dir.is_dir():
  172. continue
  173. for hour_dir in date_dir.iterdir():
  174. if not hour_dir.is_dir():
  175. continue
  176. user_dir = hour_dir / api_key_hash
  177. if not user_dir.exists():
  178. continue
  179. file_path = user_dir / f"{schedule_id}.json"
  180. if file_path.exists():
  181. try:
  182. with open(file_path, "rb") as f:
  183. encrypted_data = f.read()
  184. schedule = decrypt_json(encrypted_data, get_encryption_key_cached())
  185. return schedule, str(file_path)
  186. except Exception as e:
  187. logger.error(f"Failed to load schedule {file_path}: {e}")
  188. return None, None
  189. @web_app.get("/")
  190. async def root():
  191. """Landing page with usage instructions."""
  192. return {
  193. "service": "Letta Switchboard",
  194. "description": "Free hosted message routing service for Letta agents",
  195. "version": "1.0.0",
  196. "documentation": "https://github.com/cpfiffer/letta-switchboard",
  197. "features": [
  198. "Send messages immediately or scheduled for later",
  199. "Recurring schedules with cron expressions",
  200. "Secure API key isolation",
  201. "Execution tracking with run IDs"
  202. ],
  203. "quick_start": {
  204. "curl_example": {
  205. "one_time": "curl -X POST https://letta--switchboard-api.modal.run/schedules/one-time -H 'Authorization: Bearer YOUR_LETTA_API_KEY' -H 'Content-Type: application/json' -d '{\"agent_id\": \"agent-xxx\", \"execute_at\": \"2025-11-13T09:00:00Z\", \"message\": \"Hello!\"}'",
  206. "recurring": "curl -X POST https://letta--switchboard-api.modal.run/schedules/recurring -H 'Authorization: Bearer YOUR_LETTA_API_KEY' -H 'Content-Type: application/json' -d '{\"agent_id\": \"agent-xxx\", \"cron\": \"0 9 * * 1-5\", \"message\": \"Daily standup\"}'"
  207. },
  208. "cli": {
  209. "install": "git clone https://github.com/cpfiffer/letta-switchboard.git && cd letta-switchboard/cli && go build -o letta-switchboard",
  210. "configure": "./letta-switchboard config set-api-key YOUR_LETTA_API_KEY",
  211. "send": "./letta-switchboard send --agent-id agent-xxx --message 'Hello!'",
  212. "schedule": "./letta-switchboard send --agent-id agent-xxx --message 'Reminder' --execute-at 'tomorrow at 9am'",
  213. "recurring": "./letta-switchboard recurring create --agent-id agent-xxx --message 'Daily standup' --cron 'every weekday'"
  214. }
  215. },
  216. "endpoints": {
  217. "POST /schedules/one-time": "Create a one-time schedule",
  218. "POST /schedules/recurring": "Create a recurring schedule",
  219. "GET /schedules/one-time": "List your one-time schedules",
  220. "GET /schedules/recurring": "List your recurring schedules",
  221. "GET /schedules/one-time/{id}": "Get specific one-time schedule",
  222. "GET /schedules/recurring/{id}": "Get specific recurring schedule",
  223. "DELETE /schedules/one-time/{id}": "Delete one-time schedule",
  224. "DELETE /schedules/recurring/{id}": "Delete recurring schedule",
  225. "GET /results": "List execution results",
  226. "GET /results/{schedule_id}": "Get result for specific schedule"
  227. },
  228. "authentication": "All endpoints require 'Authorization: Bearer YOUR_LETTA_API_KEY' header",
  229. "support": "https://github.com/cpfiffer/letta-switchboard/issues"
  230. }
  231. @web_app.post("/schedules/recurring")
  232. async def create_recurring_schedule(schedule: RecurringScheduleCreate, credentials: HTTPAuthorizationCredentials = Security(security)):
  233. api_key = credentials.credentials
  234. if not validate_api_key(api_key):
  235. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  236. # Inject api_key from Authorization header
  237. schedule_obj = RecurringSchedule(api_key=api_key, **schedule.model_dump())
  238. schedule_dict = schedule_obj.model_dump(mode='json')
  239. file_path = get_recurring_schedule_path(api_key, schedule_obj.id)
  240. save_schedule(file_path, schedule_dict)
  241. response_dict = schedule_dict.copy()
  242. response_dict.pop("api_key", None)
  243. return JSONResponse(content=response_dict, status_code=201)
  244. @web_app.post("/schedules/one-time")
  245. async def create_onetime_schedule(schedule: OneTimeScheduleCreate, credentials: HTTPAuthorizationCredentials = Security(security)):
  246. api_key = credentials.credentials
  247. if not validate_api_key(api_key):
  248. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  249. # Inject api_key from Authorization header
  250. schedule_obj = OneTimeSchedule(api_key=api_key, **schedule.model_dump())
  251. schedule_dict = schedule_obj.model_dump(mode='json')
  252. file_path = get_onetime_schedule_path(api_key, schedule.execute_at, schedule_obj.id)
  253. save_schedule(file_path, schedule_dict)
  254. response_dict = schedule_dict.copy()
  255. response_dict.pop("api_key", None)
  256. return JSONResponse(content=response_dict, status_code=201)
  257. @web_app.get("/schedules/recurring")
  258. async def list_recurring_schedules(credentials: HTTPAuthorizationCredentials = Security(security)):
  259. api_key = credentials.credentials
  260. if not validate_api_key(api_key):
  261. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  262. schedules = list_recurring_schedules_for_user(api_key)
  263. for schedule in schedules:
  264. schedule.pop("api_key", None)
  265. return JSONResponse(content=schedules)
  266. @web_app.get("/schedules/one-time")
  267. async def list_onetime_schedules(credentials: HTTPAuthorizationCredentials = Security(security)):
  268. api_key = credentials.credentials
  269. if not validate_api_key(api_key):
  270. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  271. schedules = list_onetime_schedules_for_user(api_key)
  272. for schedule in schedules:
  273. schedule.pop("api_key", None)
  274. return JSONResponse(content=schedules)
  275. @web_app.get("/schedules/recurring/{schedule_id}")
  276. async def get_recurring_schedule(schedule_id: str, credentials: HTTPAuthorizationCredentials = Security(security)):
  277. api_key = credentials.credentials
  278. if not validate_api_key(api_key):
  279. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  280. file_path = get_recurring_schedule_path(api_key, schedule_id)
  281. schedule = load_schedule(file_path)
  282. if schedule is None:
  283. raise HTTPException(status_code=404, detail="Schedule not found")
  284. if schedule.get("api_key") != api_key:
  285. raise HTTPException(status_code=403, detail="Forbidden")
  286. schedule_copy = schedule.copy()
  287. schedule_copy.pop("api_key", None)
  288. return JSONResponse(content=schedule_copy)
  289. @web_app.get("/schedules/one-time/{schedule_id}")
  290. async def get_onetime_schedule(schedule_id: str, credentials: HTTPAuthorizationCredentials = Security(security)):
  291. api_key = credentials.credentials
  292. if not validate_api_key(api_key):
  293. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  294. schedule, _ = find_onetime_schedule_for_user(api_key, schedule_id)
  295. if schedule is None:
  296. raise HTTPException(status_code=404, detail="Schedule not found")
  297. if schedule.get("api_key") != api_key:
  298. raise HTTPException(status_code=403, detail="Forbidden")
  299. schedule_copy = schedule.copy()
  300. schedule_copy.pop("api_key", None)
  301. return JSONResponse(content=schedule_copy)
  302. @web_app.delete("/schedules/recurring/{schedule_id}")
  303. async def delete_recurring_schedule(schedule_id: str, credentials: HTTPAuthorizationCredentials = Security(security)):
  304. api_key = credentials.credentials
  305. if not validate_api_key(api_key):
  306. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  307. file_path = get_recurring_schedule_path(api_key, schedule_id)
  308. schedule = load_schedule(file_path)
  309. if schedule is None:
  310. raise HTTPException(status_code=404, detail="Schedule not found")
  311. if schedule.get("api_key") != api_key:
  312. raise HTTPException(status_code=403, detail="Forbidden")
  313. if delete_schedule(file_path):
  314. return JSONResponse(content={"message": "Schedule deleted"})
  315. else:
  316. raise HTTPException(status_code=404, detail="Schedule not found")
  317. @web_app.delete("/schedules/one-time/{schedule_id}")
  318. async def delete_onetime_schedule(schedule_id: str, credentials: HTTPAuthorizationCredentials = Security(security)):
  319. api_key = credentials.credentials
  320. if not validate_api_key(api_key):
  321. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  322. schedule, file_path = find_onetime_schedule_for_user(api_key, schedule_id)
  323. if schedule is None:
  324. raise HTTPException(status_code=404, detail="Schedule not found")
  325. if schedule.get("api_key") != api_key:
  326. raise HTTPException(status_code=403, detail="Forbidden")
  327. if delete_schedule(file_path):
  328. return JSONResponse(content={"message": "Schedule deleted"})
  329. else:
  330. raise HTTPException(status_code=404, detail="Schedule not found")
  331. @web_app.get("/results")
  332. async def list_execution_results(credentials: HTTPAuthorizationCredentials = Security(security)):
  333. """List all execution results for the authenticated user."""
  334. api_key = credentials.credentials
  335. if not validate_api_key(api_key):
  336. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  337. api_key_hash = get_api_key_hash(api_key)
  338. results_dir = f"{RESULTS_BASE}/{api_key_hash}"
  339. results = []
  340. if Path(results_dir).exists():
  341. for result_file in Path(results_dir).glob("*.json"):
  342. try:
  343. with open(result_file, "rb") as f:
  344. encrypted_data = f.read()
  345. result = decrypt_json(encrypted_data, get_encryption_key_cached())
  346. results.append(result)
  347. except Exception as e:
  348. logger.error(f"Failed to load result {result_file}: {e}")
  349. return JSONResponse(content=results)
  350. @web_app.get("/results/{schedule_id}")
  351. async def get_execution_result(schedule_id: str, credentials: HTTPAuthorizationCredentials = Security(security)):
  352. """Get execution result for a specific schedule."""
  353. api_key = credentials.credentials
  354. if not validate_api_key(api_key):
  355. raise HTTPException(status_code=401, detail="Invalid Letta API key")
  356. api_key_hash = get_api_key_hash(api_key)
  357. result_file = f"{RESULTS_BASE}/{api_key_hash}/{schedule_id}.json"
  358. if not Path(result_file).exists():
  359. raise HTTPException(status_code=404, detail="Result not found")
  360. try:
  361. with open(result_file, "rb") as f:
  362. encrypted_data = f.read()
  363. result = decrypt_json(encrypted_data, get_encryption_key_cached())
  364. return JSONResponse(content=result)
  365. except Exception as e:
  366. logger.error(f"Failed to load result: {e}")
  367. raise HTTPException(status_code=500, detail="Failed to load result")
  368. @app.function(image=image, volumes={VOLUME_PATH: volume}, secrets=[encryption_secret] if encryption_secret else [])
  369. @modal.asgi_app()
  370. def api():
  371. return web_app
  372. @app.function(image=image, volumes={VOLUME_PATH: volume}, secrets=[encryption_secret] if encryption_secret else [])
  373. async def execute_schedule(
  374. schedule_id: str,
  375. agent_id: str,
  376. api_key: str,
  377. message: str,
  378. role: str,
  379. schedule_type: str,
  380. execute_at: str = None,
  381. ):
  382. logger.info(f"Executing {schedule_type} schedule {schedule_id} for agent {agent_id}")
  383. # Reload volume to see latest changes (in case schedule was deleted)
  384. volume.reload()
  385. # Check if schedule still exists before executing
  386. if schedule_type == "one-time" and execute_at:
  387. file_path = get_onetime_schedule_path(api_key, execute_at, schedule_id)
  388. else:
  389. file_path = get_recurring_schedule_path(api_key, schedule_id)
  390. schedule = load_schedule(file_path)
  391. if not schedule:
  392. # For one-time schedules, this is expected (already executed or deleted)
  393. # For recurring schedules, this indicates the schedule was deleted by the user
  394. if schedule_type == "one-time":
  395. logger.debug(f"One-time schedule {schedule_id} not found (already executed or deleted)")
  396. else:
  397. logger.info(f"Recurring schedule {schedule_id} was deleted, skipping execution")
  398. return {"success": False, "error": "Schedule deleted"}
  399. # For one-time schedules: DELETE IMMEDIATELY to prevent race condition
  400. # Filesystem becomes source of truth: if file exists, it hasn't run
  401. if schedule_type == "one-time":
  402. try:
  403. Path(file_path).unlink()
  404. volume.commit()
  405. logger.info(f"Deleted one-time schedule {schedule_id} to prevent re-execution")
  406. except Exception as e:
  407. logger.error(f"Failed to delete schedule {schedule_id}, may re-execute: {e}")
  408. return {"success": False, "error": "Could not lock schedule for execution"}
  409. # For recurring schedules: update last_run timestamp
  410. elif schedule_type == "recurring":
  411. schedule["last_run"] = datetime.utcnow().isoformat()
  412. save_schedule(file_path, schedule)
  413. logger.info(f"Updated last_run for recurring schedule {schedule_id}")
  414. # Execute the message
  415. result = await execute_letta_message(agent_id, api_key, message, role)
  416. # Save execution result if successful
  417. if result.get("success") and result.get("run_id"):
  418. save_execution_result(
  419. api_key=api_key,
  420. schedule_id=schedule_id,
  421. run_id=result["run_id"],
  422. schedule_type=schedule_type,
  423. agent_id=agent_id,
  424. message=message,
  425. )
  426. return result
  427. def save_execution_result(api_key: str, schedule_id: str, run_id: str, schedule_type: str, agent_id: str, message: str):
  428. """Save execution result to results folder."""
  429. api_key_hash = get_api_key_hash(api_key)
  430. result_dir = f"{RESULTS_BASE}/{api_key_hash}"
  431. Path(result_dir).mkdir(parents=True, exist_ok=True)
  432. result_file = f"{result_dir}/{schedule_id}.json"
  433. result_data = {
  434. "schedule_id": schedule_id,
  435. "schedule_type": schedule_type,
  436. "run_id": run_id,
  437. "agent_id": agent_id,
  438. "message": message,
  439. "executed_at": datetime.utcnow().isoformat(),
  440. }
  441. encrypted_data = encrypt_json(result_data, get_encryption_key_cached())
  442. with open(result_file, "wb") as f:
  443. f.write(encrypted_data)
  444. volume.commit()
  445. logger.info(f"Saved execution result for schedule {schedule_id}, run_id: {run_id}")
  446. def cleanup_empty_directories():
  447. """Remove empty directories to keep filesystem clean."""
  448. removed_count = 0
  449. # Clean up one-time schedule directories (date/hour/user structure)
  450. onetime_base = f"{SCHEDULES_BASE}/one-time"
  451. if Path(onetime_base).exists():
  452. for date_dir in Path(onetime_base).iterdir():
  453. if not date_dir.is_dir():
  454. continue
  455. for hour_dir in date_dir.iterdir():
  456. if not hour_dir.is_dir():
  457. continue
  458. # Remove empty user directories
  459. for user_dir in hour_dir.iterdir():
  460. if user_dir.is_dir() and not any(user_dir.iterdir()):
  461. user_dir.rmdir()
  462. removed_count += 1
  463. logger.debug(f"Removed empty directory: {user_dir}")
  464. # Remove empty hour directory
  465. if not any(hour_dir.iterdir()):
  466. hour_dir.rmdir()
  467. removed_count += 1
  468. logger.debug(f"Removed empty directory: {hour_dir}")
  469. # Remove empty date directory
  470. if not any(date_dir.iterdir()):
  471. date_dir.rmdir()
  472. removed_count += 1
  473. logger.debug(f"Removed empty directory: {date_dir}")
  474. # Clean up recurring schedule directories (user structure only)
  475. recurring_base = f"{SCHEDULES_BASE}/recurring"
  476. if Path(recurring_base).exists():
  477. for user_dir in Path(recurring_base).iterdir():
  478. if user_dir.is_dir() and not any(user_dir.iterdir()):
  479. user_dir.rmdir()
  480. removed_count += 1
  481. logger.debug(f"Removed empty directory: {user_dir}")
  482. if removed_count > 0:
  483. logger.info(f"Cleanup: Removed {removed_count} empty directories")
  484. volume.commit()
  485. @app.function(
  486. image=image,
  487. volumes={VOLUME_PATH: volume},
  488. secrets=[encryption_secret] if encryption_secret else [],
  489. schedule=modal.Cron("* * * * *"),
  490. )
  491. async def check_and_execute_schedules():
  492. logger.info("Checking schedules...")
  493. # Reload volume to see latest changes (deletes, updates, etc.)
  494. volume.reload()
  495. current_time = datetime.now(timezone.utc)
  496. # Check recurring schedules (all users)
  497. recurring_schedules = list_all_recurring_schedules()
  498. for schedule in recurring_schedules:
  499. if is_recurring_schedule_due(schedule, current_time):
  500. logger.info(f"Executing recurring schedule {schedule['id']}")
  501. execute_schedule.spawn(
  502. schedule_id=schedule["id"],
  503. agent_id=schedule["agent_id"],
  504. api_key=schedule["api_key"],
  505. message=schedule["message"],
  506. role=schedule["role"],
  507. schedule_type="recurring",
  508. )
  509. # Check one-time schedules (only current date/hour bucket)
  510. date_str = current_time.strftime("%Y-%m-%d")
  511. hour_str = current_time.strftime("%H")
  512. onetime_schedules = list_onetime_schedules_for_time(date_str, hour_str)
  513. for schedule in onetime_schedules:
  514. if is_onetime_schedule_due(schedule, current_time):
  515. logger.info(f"Executing one-time schedule {schedule['id']}")
  516. execute_schedule.spawn(
  517. schedule_id=schedule["id"],
  518. agent_id=schedule["agent_id"],
  519. api_key=schedule["api_key"],
  520. message=schedule["message"],
  521. role=schedule["role"],
  522. schedule_type="one-time",
  523. execute_at=schedule["execute_at"],
  524. )
  525. # Clean up empty directories
  526. cleanup_empty_directories()
  527. logger.info("Schedule check complete")