| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- from letta_client import Letta, MessageCreate, TextContent
- import logging
- import json
- logger = logging.getLogger(__name__)
- def parse_error(e: Exception) -> dict:
- """Extract error message and metadata from exceptions."""
- error_str = str(e)
- result = {"message": error_str, "permanent": False}
- # Check for common timeout errors
- if "timed out" in error_str.lower():
- result["message"] = "Request timed out"
- return result
- # Check for connection errors
- if "connection" in error_str.lower() and "error" in error_str.lower():
- result["message"] = "Connection error"
- return result
- # Try to extract body from HTTP errors (letta_client format)
- if "body:" in error_str:
- try:
- # Extract the body part
- body_start = error_str.find("body:") + 5
- body_str = error_str[body_start:].strip()
- body = eval(body_str) # Safe here since it's from our own API response
- if isinstance(body, dict):
- error_msg = body.get("error", "")
- reasons = body.get("reasons", [])
- status_code = None
- # Try to get status code
- if "status_code:" in error_str:
- try:
- sc_start = error_str.find("status_code:") + 12
- sc_end = error_str.find(",", sc_start)
- status_code = int(error_str[sc_start:sc_end].strip())
- except (ValueError, IndexError):
- pass
- # For auth errors - permanent, should remove schedule
- if status_code in (401, 403):
- result["message"] = f"Authentication failed: {error_msg}"
- result["permanent"] = True
- return result
- # For not found - permanent, should remove schedule
- if status_code == 404 or "not-found" in str(reasons).lower():
- result["message"] = "Agent not found"
- result["permanent"] = True
- return result
- # For rate limits - transient, keep schedule
- if status_code == 429 or error_msg == "Rate limited":
- if reasons:
- result["message"] = f"Rate limited: {', '.join(reasons)}"
- else:
- result["message"] = "Rate limited"
- return result
- # Generic API error
- if error_msg:
- if reasons:
- result["message"] = f"{error_msg}: {', '.join(reasons)}"
- else:
- result["message"] = error_msg
- return result
- except Exception:
- pass
- # Fallback: truncate if too long
- if len(error_str) > 100:
- result["message"] = error_str[:100] + "..."
- return result
- def validate_api_key(api_key: str) -> bool:
- try:
- client = Letta(token=api_key)
- client.agents.list(limit=1)
- return True
- except Exception as e:
- error_info = parse_error(e)
- logger.error(f"API key validation failed: {error_info['message']}")
- return False
- async def execute_letta_message(agent_id: str, api_key: str, message: str, role: str = "user"):
- try:
- client = Letta(token=api_key)
- # Use create_async() with proper MessageCreate objects
- run = client.agents.messages.create_async(
- agent_id=agent_id,
- messages=[
- MessageCreate(
- role=role,
- content=[
- TextContent(text=message)
- ]
- )
- ]
- )
- logger.info(f"Successfully queued message for agent {agent_id}, run_id: {run.id}")
- return {"success": True, "run_id": run.id}
- except Exception as e:
- error_info = parse_error(e)
- logger.error(f"Failed to send message to agent {agent_id}: {error_info['message']}")
- return {
- "success": False,
- "error": error_info["message"],
- "permanent": error_info["permanent"]
- }
|