letta_executor.py 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839
  1. from letta_client import Letta, MessageCreate, TextContent
  2. import logging
  3. logger = logging.getLogger(__name__)
  4. def validate_api_key(api_key: str) -> bool:
  5. try:
  6. client = Letta(token=api_key)
  7. client.agents.list(limit=1)
  8. return True
  9. except Exception as e:
  10. logger.error(f"API key validation failed: {str(e)}")
  11. return False
  12. async def execute_letta_message(agent_id: str, api_key: str, message: str, role: str = "user"):
  13. try:
  14. client = Letta(token=api_key)
  15. # Use create_async() with proper MessageCreate objects
  16. run = client.agents.messages.create_async(
  17. agent_id=agent_id,
  18. messages=[
  19. MessageCreate(
  20. role=role,
  21. content=[
  22. TextContent(text=message)
  23. ]
  24. )
  25. ]
  26. )
  27. logger.info(f"Successfully queued message for agent {agent_id}, run_id: {run.id}")
  28. return {"success": True, "run_id": run.id}
  29. except Exception as e:
  30. logger.error(f"Failed to send message to agent {agent_id}: {str(e)}")
  31. return {"success": False, "error": str(e)}