letta_executor.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. from letta_client import Letta, MessageCreate, TextContent
  2. import logging
  3. import json
  4. logger = logging.getLogger(__name__)
  5. def parse_error(e: Exception) -> dict:
  6. """Extract error message and metadata from exceptions."""
  7. error_str = str(e)
  8. result = {"message": error_str, "permanent": False}
  9. # Check for common timeout errors
  10. if "timed out" in error_str.lower():
  11. result["message"] = "Request timed out"
  12. return result
  13. # Check for connection errors
  14. if "connection" in error_str.lower() and "error" in error_str.lower():
  15. result["message"] = "Connection error"
  16. return result
  17. # Try to extract body from HTTP errors (letta_client format)
  18. if "body:" in error_str:
  19. try:
  20. # Extract the body part
  21. body_start = error_str.find("body:") + 5
  22. body_str = error_str[body_start:].strip()
  23. body = eval(body_str) # Safe here since it's from our own API response
  24. if isinstance(body, dict):
  25. error_msg = body.get("error", "")
  26. reasons = body.get("reasons", [])
  27. status_code = None
  28. # Try to get status code
  29. if "status_code:" in error_str:
  30. try:
  31. sc_start = error_str.find("status_code:") + 12
  32. sc_end = error_str.find(",", sc_start)
  33. status_code = int(error_str[sc_start:sc_end].strip())
  34. except (ValueError, IndexError):
  35. pass
  36. # For auth errors - permanent, should remove schedule
  37. if status_code in (401, 403):
  38. result["message"] = f"Authentication failed: {error_msg}"
  39. result["permanent"] = True
  40. return result
  41. # For not found - permanent, should remove schedule
  42. if status_code == 404 or "not-found" in str(reasons).lower():
  43. result["message"] = "Agent not found"
  44. result["permanent"] = True
  45. return result
  46. # For rate limits - transient, keep schedule
  47. if status_code == 429 or error_msg == "Rate limited":
  48. if reasons:
  49. result["message"] = f"Rate limited: {', '.join(reasons)}"
  50. else:
  51. result["message"] = "Rate limited"
  52. return result
  53. # Generic API error
  54. if error_msg:
  55. if reasons:
  56. result["message"] = f"{error_msg}: {', '.join(reasons)}"
  57. else:
  58. result["message"] = error_msg
  59. return result
  60. except Exception:
  61. pass
  62. # Fallback: truncate if too long
  63. if len(error_str) > 100:
  64. result["message"] = error_str[:100] + "..."
  65. return result
  66. def validate_api_key(api_key: str) -> bool:
  67. try:
  68. client = Letta(token=api_key)
  69. client.agents.list(limit=1)
  70. return True
  71. except Exception as e:
  72. error_info = parse_error(e)
  73. logger.error(f"API key validation failed: {error_info['message']}")
  74. return False
  75. async def execute_letta_message(agent_id: str, api_key: str, message: str, role: str = "user"):
  76. try:
  77. client = Letta(token=api_key)
  78. # Use create_async() with proper MessageCreate objects
  79. run = client.agents.messages.create_async(
  80. agent_id=agent_id,
  81. messages=[
  82. MessageCreate(
  83. role=role,
  84. content=[
  85. TextContent(text=message)
  86. ]
  87. )
  88. ]
  89. )
  90. logger.info(f"Successfully queued message for agent {agent_id}, run_id: {run.id}")
  91. return {"success": True, "run_id": run.id}
  92. except Exception as e:
  93. error_info = parse_error(e)
  94. logger.error(f"Failed to send message to agent {agent_id}: {error_info['message']}")
  95. return {
  96. "success": False,
  97. "error": error_info["message"],
  98. "permanent": error_info["permanent"]
  99. }