Browse Source

Save failed execution results and terminate schedules on failure

Previously, when messages failed to send (401 Unauthorized, network errors, etc.):
- One-time: Schedule deleted, no record of failure
- Recurring: Kept retrying forever with invalid credentials

Now:
- Always save execution results (success or failure)
- Failed results include status="failed" and error message
- Recurring schedules are terminated on first failure (no retries)

Changes:
1. Updated save_execution_result() to support status and error fields
   - status: "success" or "failed"
   - error: Error message (only present on failure)
   - run_id: Only present on success

2. Executor now saves results in both cases:
   - Success: Saves with run_id and status="success"
   - Failure: Saves with error message and status="failed"

3. Recurring schedules terminate on failure:
   - Deleted from filesystem immediately
   - Logged with warning
   - User can check GET /results/{id} to see what went wrong

4. Results API automatically returns new fields:
   - GET /results shows all results with status
   - GET /results/{id} shows specific result with error if failed

Benefits:
- Users can see why their schedules failed
- No infinite retry loops with invalid credentials
- Clear audit trail of all execution attempts
- Failed schedules don't waste resources

Example failed result:
{
  "schedule_id": "abc-123",
  "status": "failed",
  "error": "401 Unauthorized",
  "agent_id": "agent-xxx",
  "message": "Test message",
  "executed_at": "2025-11-12T22:30:00Z"
}

👾 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>
Cameron Pfiffer 5 months ago
parent
commit
b1148cb0a1
1 changed files with 39 additions and 6 deletions
  1. 39 6
      app.py

+ 39 - 6
app.py

@@ -658,22 +658,47 @@ async def execute_schedule(
     # Execute the message
     result = await execute_letta_message(agent_id, api_key, message, role)
     
-    # Save execution result if successful
-    if result.get("success") and result.get("run_id"):
+    # Always save execution result (success or failure)
+    if result.get("success"):
+        # Successful execution
         save_execution_result(
             api_key=api_key,
             schedule_id=schedule_id,
-            run_id=result["run_id"],
             schedule_type=schedule_type,
             agent_id=agent_id,
             message=message,
+            run_id=result.get("run_id"),
+            status="success"
         )
+    else:
+        # Failed execution - save error result
+        error_msg = result.get("error", "Unknown error")
+        logger.error(f"Execution failed for schedule {schedule_id}: {error_msg}")
+        
+        save_execution_result(
+            api_key=api_key,
+            schedule_id=schedule_id,
+            schedule_type=schedule_type,
+            agent_id=agent_id,
+            message=message,
+            error=error_msg,
+            status="failed"
+        )
+        
+        # Terminate recurring schedules on failure (no retries)
+        if schedule_type == "recurring":
+            try:
+                Path(file_path).unlink()
+                volume.commit()
+                logger.warning(f"Terminated recurring schedule {schedule_id} due to execution failure: {error_msg}")
+            except Exception as e:
+                logger.error(f"Failed to delete failed recurring schedule {schedule_id}: {e}")
     
     return result
 
 
-def save_execution_result(api_key: str, schedule_id: str, run_id: str, schedule_type: str, agent_id: str, message: str):
-    """Save execution result to results folder."""
+def save_execution_result(api_key: str, schedule_id: str, schedule_type: str, agent_id: str, message: str, run_id: str = None, error: str = None, status: str = "success"):
+    """Save execution result to results folder (success or failure)."""
     api_key_hash = get_api_key_hash(api_key)
     result_dir = f"{RESULTS_BASE}/{api_key_hash}"
     Path(result_dir).mkdir(parents=True, exist_ok=True)
@@ -683,12 +708,20 @@ def save_execution_result(api_key: str, schedule_id: str, run_id: str, schedule_
     result_data = {
         "schedule_id": schedule_id,
         "schedule_type": schedule_type,
-        "run_id": run_id,
+        "status": status,
         "agent_id": agent_id,
         "message": message,
         "executed_at": datetime.utcnow().isoformat(),
     }
     
+    # Add run_id only if present (successful execution)
+    if run_id:
+        result_data["run_id"] = run_id
+    
+    # Add error only if present (failed execution)
+    if error:
+        result_data["error"] = error
+    
     encrypted_data = encrypt_json(result_data, get_encryption_key_cached())
     with open(result_file, "wb") as f:
         f.write(encrypted_data)