Browse Source

Remove deprecated backend logic files

Deleted files that are no longer needed after migrating to Letta's native scheduling:

Removed:
- letta_executor.py - Letta now handles execution
- scheduler.py - Letta now handles cron checking
- crypto_utils.py - No file storage needed
- models.py - Using Letta's API models
- test_api.py - Old API tests no longer relevant
- tests/ - Old test suite for deprecated backend

The new app.py has zero dependencies on these files.
All scheduling logic is now handled by Letta Cloud.

Result: Removed ~500 lines of code that's no longer needed.

👾 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>
Cameron Pfiffer 3 months ago
parent
commit
5e24f9ee74
10 changed files with 0 additions and 1460 deletions
  1. 0 53
      crypto_utils.py
  2. 0 120
      letta_executor.py
  3. 0 41
      models.py
  4. 0 55
      scheduler.py
  5. 0 238
      test_api.py
  6. 0 212
      tests/README.md
  7. 0 107
      tests/conftest.py
  8. 0 378
      tests/test_api_e2e.py
  9. 0 133
      tests/test_crypto_utils.py
  10. 0 123
      tests/test_scheduler.py

+ 0 - 53
crypto_utils.py

@@ -1,53 +0,0 @@
-import hashlib
-import json
-from cryptography.fernet import Fernet
-import logging
-import os
-
-logger = logging.getLogger(__name__)
-
-
-def get_api_key_hash(api_key: str) -> str:
-    """Generate a hash of the API key for directory naming."""
-    return hashlib.sha256(api_key.encode()).hexdigest()[:16]
-
-
-def is_dev_mode() -> bool:
-    """Check if we're in dev mode (no encryption)."""
-    return os.getenv("LETTA_SWITCHBOARD_DEV_MODE", "").lower() in ("true", "1", "yes")
-
-
-def get_encryption_key() -> bytes:
-    """Get or generate encryption key for JSON encryption."""
-    key = os.getenv("LETTA_SWITCHBOARD_ENCRYPTION_KEY")
-    
-    if is_dev_mode():
-        logger.warning("🔓 DEV MODE: Encryption disabled - files stored in plaintext")
-        return b"dev-mode-no-encryption"
-    
-    if not key:
-        logger.warning("LETTA_SWITCHBOARD_ENCRYPTION_KEY not set, generating temporary key")
-        key = Fernet.generate_key().decode()
-    
-    return key.encode()
-
-
-def encrypt_json(data: dict, encryption_key: bytes) -> bytes:
-    """Encrypt a JSON object (or return plaintext in dev mode)."""
-    json_bytes = json.dumps(data, default=str, indent=2).encode()
-    
-    if is_dev_mode():
-        return json_bytes
-    
-    fernet = Fernet(encryption_key)
-    return fernet.encrypt(json_bytes)
-
-
-def decrypt_json(encrypted_data: bytes, encryption_key: bytes) -> dict:
-    """Decrypt an encrypted JSON object (or parse plaintext in dev mode)."""
-    if is_dev_mode():
-        return json.loads(encrypted_data)
-    
-    fernet = Fernet(encryption_key)
-    json_bytes = fernet.decrypt(encrypted_data)
-    return json.loads(json_bytes)

+ 0 - 120
letta_executor.py

@@ -1,120 +0,0 @@
-from letta_client import Letta, MessageCreate, TextContent
-import logging
-import json
-
-logger = logging.getLogger(__name__)
-
-
-def parse_error(e: Exception) -> dict:
-    """Extract error message and metadata from exceptions."""
-    error_str = str(e)
-    result = {"message": error_str, "permanent": False}
-
-    # Check for common timeout errors
-    if "timed out" in error_str.lower():
-        result["message"] = "Request timed out"
-        return result
-
-    # Check for connection errors
-    if "connection" in error_str.lower() and "error" in error_str.lower():
-        result["message"] = "Connection error"
-        return result
-
-    # Try to extract body from HTTP errors (letta_client format)
-    if "body:" in error_str:
-        try:
-            # Extract the body part
-            body_start = error_str.find("body:") + 5
-            body_str = error_str[body_start:].strip()
-            body = eval(body_str)  # Safe here since it's from our own API response
-
-            if isinstance(body, dict):
-                error_msg = body.get("error", "")
-                reasons = body.get("reasons", [])
-                status_code = None
-
-                # Try to get status code
-                if "status_code:" in error_str:
-                    try:
-                        sc_start = error_str.find("status_code:") + 12
-                        sc_end = error_str.find(",", sc_start)
-                        status_code = int(error_str[sc_start:sc_end].strip())
-                    except (ValueError, IndexError):
-                        pass
-
-                # For auth errors - permanent, should remove schedule
-                if status_code in (401, 403):
-                    result["message"] = f"Authentication failed: {error_msg}"
-                    result["permanent"] = True
-                    return result
-
-                # For not found - permanent, should remove schedule
-                if status_code == 404 or "not-found" in str(reasons).lower():
-                    result["message"] = "Agent not found"
-                    result["permanent"] = True
-                    return result
-
-                # For rate limits - transient, keep schedule
-                if status_code == 429 or error_msg == "Rate limited":
-                    if reasons:
-                        result["message"] = f"Rate limited: {', '.join(reasons)}"
-                    else:
-                        result["message"] = "Rate limited"
-                    return result
-
-                # Generic API error
-                if error_msg:
-                    if reasons:
-                        result["message"] = f"{error_msg}: {', '.join(reasons)}"
-                    else:
-                        result["message"] = error_msg
-                    return result
-        except Exception:
-            pass
-
-    # Fallback: truncate if too long
-    if len(error_str) > 100:
-        result["message"] = error_str[:100] + "..."
-
-    return result
-
-
-def validate_api_key(api_key: str) -> bool:
-    try:
-        client = Letta(token=api_key)
-        client.agents.list(limit=1)
-        return True
-    except Exception as e:
-        error_info = parse_error(e)
-        logger.error(f"API key validation failed: {error_info['message']}")
-        return False
-
-
-async def execute_letta_message(agent_id: str, api_key: str, message: str, role: str = "user"):
-    try:
-        client = Letta(token=api_key)
-
-        # Use create_async() with proper MessageCreate objects
-        run = client.agents.messages.create_async(
-            agent_id=agent_id,
-            messages=[
-                MessageCreate(
-                    role=role,
-                    content=[
-                        TextContent(text=message)
-                    ]
-                )
-            ]
-        )
-
-        logger.info(f"Successfully queued message for agent {agent_id}, run_id: {run.id}")
-        return {"success": True, "run_id": run.id}
-
-    except Exception as e:
-        error_info = parse_error(e)
-        logger.error(f"Failed to send message to agent {agent_id}: {error_info['message']}")
-        return {
-            "success": False,
-            "error": error_info["message"],
-            "permanent": error_info["permanent"]
-        }

+ 0 - 41
models.py

@@ -1,41 +0,0 @@
-from pydantic import BaseModel, Field
-from typing import Optional
-from datetime import datetime
-import uuid
-
-
-class RecurringScheduleCreate(BaseModel):
-    agent_id: str
-    cron: str
-    message: str
-    role: str = "user"
-    timezone: str = "UTC"
-
-
-class RecurringSchedule(BaseModel):
-    id: str = Field(default_factory=lambda: str(uuid.uuid4()))
-    agent_id: str
-    api_key: str
-    cron: str
-    message: str
-    role: str = "user"
-    timezone: str = "UTC"
-    created_at: datetime = Field(default_factory=datetime.utcnow)
-    last_run: Optional[datetime] = None
-
-
-class OneTimeScheduleCreate(BaseModel):
-    agent_id: str
-    execute_at: str
-    message: str
-    role: str = "user"
-
-
-class OneTimeSchedule(BaseModel):
-    id: str = Field(default_factory=lambda: str(uuid.uuid4()))
-    agent_id: str
-    api_key: str
-    execute_at: str
-    message: str
-    role: str = "user"
-    created_at: datetime = Field(default_factory=datetime.utcnow)

+ 0 - 55
scheduler.py

@@ -1,55 +0,0 @@
-from croniter import croniter
-from datetime import datetime, timezone
-from dateutil import parser
-from zoneinfo import ZoneInfo
-import logging
-
-logger = logging.getLogger(__name__)
-
-
-def is_recurring_schedule_due(schedule_dict: dict, current_time: datetime) -> bool:
-    cron_expression = schedule_dict["cron"]
-    last_run = schedule_dict.get("last_run")
-    schedule_tz_str = schedule_dict.get("timezone", "UTC")
-
-    # Get the schedule's timezone
-    try:
-        schedule_tz = ZoneInfo(schedule_tz_str)
-    except Exception:
-        schedule_tz = ZoneInfo("UTC")
-
-    if last_run:
-        last_run_dt = parser.parse(last_run)
-    else:
-        last_run_dt = parser.parse(schedule_dict["created_at"])
-
-    if last_run_dt.tzinfo is None:
-        last_run_dt = last_run_dt.replace(tzinfo=timezone.utc)
-
-    if current_time.tzinfo is None:
-        current_time = current_time.replace(tzinfo=timezone.utc)
-
-    # Convert current time to schedule's timezone for cron evaluation
-    current_time_in_tz = current_time.astimezone(schedule_tz)
-    last_run_in_tz = last_run_dt.astimezone(schedule_tz)
-
-    cron = croniter(cron_expression, last_run_in_tz)
-    next_run = cron.get_next(datetime)
-
-    return current_time_in_tz >= next_run
-
-
-def is_onetime_schedule_due(schedule_dict: dict, current_time: datetime) -> bool:
-    if schedule_dict.get("executed", False):
-        return False
-    
-    execute_at_str = schedule_dict["execute_at"]
-    execute_at = parser.parse(execute_at_str)
-    
-    if execute_at.tzinfo is None:
-        execute_at = execute_at.replace(tzinfo=timezone.utc)
-    
-    if current_time.tzinfo is None:
-        current_time = current_time.replace(tzinfo=timezone.utc)
-    
-    return current_time >= execute_at

+ 0 - 238
test_api.py

@@ -1,238 +0,0 @@
-#!/usr/bin/env python3
-import requests
-import os
-import json
-from datetime import datetime, timedelta, timezone
-
-BASE_URL = os.getenv("LETTA_SWITCHBOARD_URL", "https://letta--letta-switchboard-api-dev.modal.run")
-API_KEY = os.getenv("LETTA_API_KEY")
-AGENT_ID = os.getenv("LETTA_AGENT_ID", "agent-a29146cc-2fb3-452d-8c0c-bf71e5db609a")
-
-if not API_KEY:
-    print("ERROR: LETTA_API_KEY environment variable not set!")
-    print("This must be a VALID Letta API key that will be validated against Letta's API.")
-    print("Set it with: export LETTA_API_KEY=sk-...")
-    exit(1)
-
-print("Configuration:")
-print(f"  Base URL: {BASE_URL}")
-print(f"  Agent ID: {AGENT_ID}")
-print(f"  API Key: {API_KEY[:20]}..." if API_KEY else "  API Key: Not set")
-
-
-def test_create_recurring_schedule():
-    print("\n=== Testing: Create Recurring Schedule ===")
-    headers = {"Authorization": f"Bearer {API_KEY}"}
-    payload = {
-        "agent_id": AGENT_ID,
-        "cron": "*/5 * * * *",
-        "message": "This is a test recurring message every 5 minutes",
-        "role": "user"
-    }
-    
-    response = requests.post(f"{BASE_URL}/schedules/recurring", json=payload, headers=headers)
-    print(f"Status: {response.status_code}")
-    
-    if response.status_code == 201:
-        data = response.json()
-        print(f"Created schedule ID: {data['id']}")
-        print(json.dumps(data, indent=2))
-        return data['id']
-    else:
-        print(f"Error: {response.text}")
-        return None
-
-
-def test_create_onetime_schedule():
-    print("\n=== Testing: Create One-Time Schedule ===")
-    
-    execute_time = datetime.now(timezone.utc) + timedelta(minutes=1)
-    execute_time_str = execute_time.isoformat()
-    
-    headers = {"Authorization": f"Bearer {API_KEY}"}
-    payload = {
-        "agent_id": AGENT_ID,
-        "execute_at": execute_time_str,
-        "message": f"This is a test one-time message scheduled for {execute_time_str}",
-        "role": "user"
-    }
-    
-    response = requests.post(f"{BASE_URL}/schedules/one-time", json=payload, headers=headers)
-    print(f"Status: {response.status_code}")
-    
-    if response.status_code == 201:
-        data = response.json()
-        print(f"Created schedule ID: {data['id']}")
-        print(json.dumps(data, indent=2))
-        return data['id']
-    else:
-        print(f"Error: {response.text}")
-        return None
-
-
-def test_list_recurring_schedules():
-    print("\n=== Testing: List Recurring Schedules ===")
-    headers = {"Authorization": f"Bearer {API_KEY}"}
-    response = requests.get(f"{BASE_URL}/schedules/recurring", headers=headers)
-    print(f"Status: {response.status_code}")
-    
-    if response.status_code == 200:
-        data = response.json()
-        print(f"Found {len(data)} recurring schedules")
-        for schedule in data:
-            print(f"  - ID: {schedule['id']}, Cron: {schedule['cron']}")
-    else:
-        print(f"Error: {response.text}")
-
-
-def test_list_onetime_schedules():
-    print("\n=== Testing: List One-Time Schedules ===")
-    headers = {"Authorization": f"Bearer {API_KEY}"}
-    response = requests.get(f"{BASE_URL}/schedules/one-time", headers=headers)
-    print(f"Status: {response.status_code}")
-    
-    if response.status_code == 200:
-        data = response.json()
-        print(f"Found {len(data)} one-time schedules")
-        for schedule in data:
-            print(f"  - ID: {schedule['id']}, Execute at: {schedule['execute_at']}")
-    else:
-        print(f"Error: {response.text}")
-
-
-def test_list_results():
-    print("\n=== Testing: List Execution Results ===")
-    headers = {"Authorization": f"Bearer {API_KEY}"}
-    response = requests.get(f"{BASE_URL}/results", headers=headers)
-    print(f"Status: {response.status_code}")
-    
-    if response.status_code == 200:
-        data = response.json()
-        print(f"Found {len(data)} execution results")
-        for result in data:
-            print(f"  - Schedule ID: {result['schedule_id']}")
-            print(f"    Type: {result['schedule_type']}")
-            print(f"    Run ID: {result['run_id']}")
-            print(f"    Executed at: {result['executed_at']}")
-    else:
-        print(f"Error: {response.text}")
-
-
-def test_get_result(schedule_id):
-    print(f"\n=== Testing: Get Execution Result for {schedule_id} ===")
-    headers = {"Authorization": f"Bearer {API_KEY}"}
-    response = requests.get(f"{BASE_URL}/results/{schedule_id}", headers=headers)
-    print(f"Status: {response.status_code}")
-    
-    if response.status_code == 200:
-        data = response.json()
-        print(f"Run ID: {data['run_id']}")
-        print(json.dumps(data, indent=2))
-    elif response.status_code == 404:
-        print("No execution result yet (schedule may not have executed)")
-    else:
-        print(f"Error: {response.text}")
-
-
-def test_get_recurring_schedule(schedule_id):
-    print(f"\n=== Testing: Get Recurring Schedule {schedule_id} ===")
-    headers = {"Authorization": f"Bearer {API_KEY}"}
-    response = requests.get(f"{BASE_URL}/schedules/recurring/{schedule_id}", headers=headers)
-    print(f"Status: {response.status_code}")
-    
-    if response.status_code == 200:
-        data = response.json()
-        print(json.dumps(data, indent=2))
-    else:
-        print(f"Error: {response.text}")
-
-
-def test_get_onetime_schedule(schedule_id):
-    print(f"\n=== Testing: Get One-Time Schedule {schedule_id} ===")
-    headers = {"Authorization": f"Bearer {API_KEY}"}
-    response = requests.get(f"{BASE_URL}/schedules/one-time/{schedule_id}", headers=headers)
-    print(f"Status: {response.status_code}")
-    
-    if response.status_code == 200:
-        data = response.json()
-        print(json.dumps(data, indent=2))
-    else:
-        print(f"Error: {response.text}")
-
-
-def test_delete_recurring_schedule(schedule_id):
-    print(f"\n=== Testing: Delete Recurring Schedule {schedule_id} ===")
-    headers = {"Authorization": f"Bearer {API_KEY}"}
-    response = requests.delete(f"{BASE_URL}/schedules/recurring/{schedule_id}", headers=headers)
-    print(f"Status: {response.status_code}")
-    
-    if response.status_code == 200:
-        print("Schedule deleted successfully")
-    else:
-        print(f"Error: {response.text}")
-
-
-def test_delete_onetime_schedule(schedule_id):
-    print(f"\n=== Testing: Delete One-Time Schedule {schedule_id} ===")
-    headers = {"Authorization": f"Bearer {API_KEY}"}
-    response = requests.delete(f"{BASE_URL}/schedules/one-time/{schedule_id}", headers=headers)
-    print(f"Status: {response.status_code}")
-    
-    if response.status_code == 200:
-        print("Schedule deleted successfully")
-    else:
-        print(f"Error: {response.text}")
-
-
-def main():
-    print("=" * 60)
-    print("Letta Schedules API Test Suite")
-    print("=" * 60)
-    print(f"Base URL: {BASE_URL}")
-    print(f"Agent ID: {AGENT_ID}")
-    
-    recurring_id = test_create_recurring_schedule()
-    onetime_id = test_create_onetime_schedule()
-    
-    test_list_recurring_schedules()
-    test_list_onetime_schedules()
-    
-    if recurring_id:
-        test_get_recurring_schedule(recurring_id)
-    
-    if onetime_id:
-        test_get_onetime_schedule(onetime_id)
-    
-    # Check execution results
-    test_list_results()
-    
-    if recurring_id:
-        test_get_result(recurring_id)
-    
-    if onetime_id:
-        test_get_result(onetime_id)
-    
-    input("\n\nPress Enter to delete test schedules...")
-    
-    if recurring_id:
-        test_delete_recurring_schedule(recurring_id)
-    
-    if onetime_id:
-        test_delete_onetime_schedule(onetime_id)
-    
-    test_list_recurring_schedules()
-    test_list_onetime_schedules()
-    
-    # Show final results
-    print("\n=== Final Results After Deletion ===")
-    test_list_results()
-    
-    print("\n" + "=" * 60)
-    print("Test suite complete!")
-    print("=" * 60)
-    print("\nNote: Execution results remain even after schedules are deleted.")
-    print("Check run status at: https://api.letta.com/v1/runs/{run_id}")
-
-
-if __name__ == "__main__":
-    main()

+ 0 - 212
tests/README.md

@@ -1,212 +0,0 @@
-# Letta Schedules Test Suite
-
-Comprehensive test suite with unit tests and end-to-end integration tests.
-
-## Test Structure
-
-```
-tests/
-├── conftest.py              # Pytest fixtures and configuration
-├── test_crypto_utils.py     # Unit tests for encryption/hashing
-├── test_scheduler.py        # Unit tests for schedule logic
-├── test_api_e2e.py          # E2E API tests (requires running service)
-└── README.md                # This file
-```
-
-## Setup
-
-Install test dependencies:
-
-```bash
-pip install -r requirements-test.txt
-```
-
-## Running Tests
-
-### Unit Tests Only (Fast)
-
-Unit tests run without requiring a running Modal service:
-
-```bash
-# Run all unit tests
-pytest -m "not e2e"
-
-# Run specific unit test file
-pytest tests/test_crypto_utils.py
-pytest tests/test_scheduler.py
-
-# Run with verbose output
-pytest -v -m "not e2e"
-```
-
-### E2E Tests (Requires Running Service)
-
-E2E tests require a running Modal service and valid Letta credentials:
-
-**1. Start the service:**
-```bash
-export LETTA_SCHEDULES_DEV_MODE=true
-modal serve app.py
-```
-
-**2. In another terminal, set environment variables:**
-```bash
-export LETTA_API_KEY="sk-..."                              # Required: Valid Letta API key
-export LETTA_AGENT_ID="agent-xxx"                          # Required: Valid agent ID
-export LETTA_SCHEDULES_URL="https://your-modal-url"        # Optional: defaults to dev URL
-export LETTA_API_KEY_2="sk-different-key"                  # Optional: for multi-user tests
-```
-
-**3. Run E2E tests:**
-```bash
-# Run all E2E tests (excluding slow tests)
-pytest -m "e2e and not slow"
-
-# Run all E2E tests including slow ones
-pytest -m "e2e"
-
-# Run specific E2E test class
-pytest tests/test_api_e2e.py::TestAuthentication
-pytest tests/test_api_e2e.py::TestRecurringScheduleCRUD
-
-# Run execution tests (these take 60-90 seconds each)
-pytest -m "slow" -v
-```
-
-### Run All Tests
-
-```bash
-# Run everything (unit + e2e, excluding slow tests)
-pytest -m "not slow"
-
-# Run absolutely everything
-pytest
-```
-
-## Test Markers
-
-Tests are organized with pytest markers:
-
-- `@pytest.mark.unit` - Fast unit tests (no external dependencies)
-- `@pytest.mark.e2e` - End-to-end tests (requires running service)
-- `@pytest.mark.slow` - Tests that wait for cron execution (60-90s each)
-
-## Test Categories
-
-### 1. Unit Tests
-
-**test_crypto_utils.py:**
-- API key hashing (deterministic, correct length)
-- Dev mode detection
-- Encryption/decryption roundtrip
-- Dev mode plaintext storage
-- Production mode encryption
-
-**test_scheduler.py:**
-- Cron schedule due checking
-- One-time schedule due checking
-- Timezone handling
-- Edge cases (just ran, long ago, exact time)
-
-### 2. E2E API Tests
-
-**test_api_e2e.py:**
-
-**Authentication:**
-- Invalid API key → 401
-- Missing auth header → 403
-- API keys never in responses
-
-**Recurring Schedules CRUD:**
-- Create, list, get, delete operations
-- Authorization checks
-- 404 on non-existent schedules
-
-**One-Time Schedules CRUD:**
-- Create, list, get, delete operations
-- Timezone support
-- Authorization checks
-
-**Results:**
-- List execution results
-- Get specific result
-- Results persist after schedule deletion
-
-**Execution (Slow Tests):**
-- Past schedules execute immediately (<90s)
-- One-time schedules deleted after execution
-- No duplicate executions
-- Results created with run_id
-
-## Environment Variables
-
-**Required for E2E tests:**
-- `LETTA_API_KEY` - Valid Letta API key
-- `LETTA_AGENT_ID` - Valid Letta agent ID
-
-**Optional:**
-- `LETTA_SCHEDULES_URL` - Override API base URL (default: dev URL)
-- `LETTA_API_KEY_2` - Second API key for multi-user isolation tests
-- `LETTA_SCHEDULES_DEV_MODE` - Enable dev mode (default: true for tests)
-
-## CI/CD Integration
-
-For continuous integration, run only fast tests:
-
-```bash
-# Run unit tests + fast E2E tests
-pytest -m "not slow" --timeout=30
-
-# Generate coverage report
-pytest --cov=. --cov-report=html -m "not slow"
-```
-
-For full integration testing (slower):
-
-```bash
-# Run everything including execution tests
-pytest -v --timeout=120
-```
-
-## Troubleshooting
-
-**Tests fail with "LETTA_API_KEY not set":**
-- E2E tests require valid Letta credentials
-- Set environment variables or run only unit tests: `pytest -m "not e2e"`
-
-**Tests timeout:**
-- Execution tests wait up to 90 seconds for cron to run
-- Ensure Modal service is running: `modal serve app.py`
-- Check service logs: `modal app logs letta-schedules --follow`
-
-**"Service not reachable":**
-- Verify Modal service is running
-- Check `LETTA_SCHEDULES_URL` points to correct endpoint
-- Ensure service accepts connections: `curl <service-url>/schedules/recurring`
-
-**Dev mode warnings:**
-- Tests automatically enable dev mode via `LETTA_SCHEDULES_DEV_MODE=true`
-- No encryption key needed for local tests
-- Files stored in plaintext for easy inspection
-
-## Example Test Run
-
-```bash
-# Terminal 1: Start service
-export LETTA_SCHEDULES_DEV_MODE=true
-modal serve app.py
-
-# Terminal 2: Run tests
-export LETTA_API_KEY="sk-..."
-export LETTA_AGENT_ID="agent-..."
-pytest -v -m "e2e and not slow"
-```
-
-Expected output:
-```
-tests/test_api_e2e.py::TestAuthentication::test_invalid_api_key_returns_401 PASSED
-tests/test_api_e2e.py::TestRecurringScheduleCRUD::test_create_recurring_schedule PASSED
-tests/test_api_e2e.py::TestOneTimeScheduleCRUD::test_create_onetime_schedule PASSED
-...
-========== 15 passed in 12.34s ==========
-```

+ 0 - 107
tests/conftest.py

@@ -1,107 +0,0 @@
-import pytest
-import os
-import sys
-import tempfile
-import shutil
-from pathlib import Path
-from datetime import datetime, timezone, timedelta
-from cryptography.fernet import Fernet
-
-# Add parent directory to path for imports
-sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-
-os.environ["LETTA_SWITCHBOARD_DEV_MODE"] = "true"
-
-
-@pytest.fixture
-def temp_volume_path():
-    """Create a temporary directory to simulate Modal volume."""
-    temp_dir = tempfile.mkdtemp()
-    yield temp_dir
-    shutil.rmtree(temp_dir)
-
-
-@pytest.fixture
-def mock_api_key():
-    """A mock API key for testing."""
-    return "test-api-key-12345"
-
-
-@pytest.fixture
-def mock_api_key_2():
-    """A second mock API key for multi-user testing."""
-    return "test-api-key-67890"
-
-
-@pytest.fixture
-def mock_agent_id():
-    """A mock agent ID for testing."""
-    return "agent-test-123"
-
-
-@pytest.fixture
-def encryption_key():
-    """Generate a test encryption key."""
-    return Fernet.generate_key()
-
-
-@pytest.fixture
-def test_recurring_schedule_data(mock_api_key, mock_agent_id):
-    """Sample recurring schedule data."""
-    return {
-        "agent_id": mock_agent_id,
-        "api_key": mock_api_key,
-        "cron": "*/5 * * * *",
-        "message": "Test recurring message",
-        "role": "user"
-    }
-
-
-@pytest.fixture
-def test_onetime_schedule_data(mock_api_key, mock_agent_id):
-    """Sample one-time schedule data."""
-    future_time = datetime.now(timezone.utc) + timedelta(minutes=5)
-    return {
-        "agent_id": mock_agent_id,
-        "api_key": mock_api_key,
-        "execute_at": future_time.isoformat(),
-        "message": "Test one-time message",
-        "role": "user"
-    }
-
-
-@pytest.fixture
-def past_onetime_schedule_data(mock_api_key, mock_agent_id):
-    """One-time schedule scheduled in the past (should execute immediately)."""
-    past_time = datetime.now(timezone.utc) - timedelta(minutes=5)
-    return {
-        "agent_id": mock_agent_id,
-        "api_key": mock_api_key,
-        "execute_at": past_time.isoformat(),
-        "message": "Test past message",
-        "role": "user"
-    }
-
-
-@pytest.fixture
-def api_base_url():
-    """Base URL for API testing. Override with LETTA_SWITCHBOARD_URL env var."""
-    return os.getenv("LETTA_SWITCHBOARD_URL", "https://letta--letta-switchboard-api-dev.modal.run")
-
-
-@pytest.fixture
-def valid_letta_api_key():
-    """Real Letta API key for E2E tests. Must be set via env var."""
-    api_key = os.getenv("LETTA_API_KEY")
-    if not api_key:
-        pytest.skip("LETTA_API_KEY not set - skipping E2E test")
-    return api_key
-
-
-@pytest.fixture
-def valid_letta_agent_id():
-    """Real Letta agent ID for E2E tests."""
-    agent_id = os.getenv("LETTA_AGENT_ID")
-    if not agent_id:
-        pytest.skip("LETTA_AGENT_ID not set - skipping E2E test")
-    return agent_id

+ 0 - 378
tests/test_api_e2e.py

@@ -1,378 +0,0 @@
-import pytest
-import requests
-import time
-import os
-from datetime import datetime, timezone, timedelta
-
-
-@pytest.mark.e2e
-class TestAuthentication:
-    def test_invalid_api_key_returns_401(self, api_base_url):
-        """Invalid API key should return 401 Unauthorized."""
-        headers = {"Authorization": "Bearer invalid-fake-key-123"}
-        response = requests.get(f"{api_base_url}/schedules/recurring", headers=headers)
-        assert response.status_code == 401
-    
-    def test_no_auth_header_returns_403(self, api_base_url):
-        """Missing auth header should return 403."""
-        response = requests.get(f"{api_base_url}/schedules/recurring")
-        assert response.status_code == 403
-    
-    def test_api_key_not_in_response(self, api_base_url, valid_letta_api_key, valid_letta_agent_id):
-        """API key should never be returned in responses."""
-        headers = {"Authorization": f"Bearer {valid_letta_api_key}"}
-        
-        # Create schedule
-        payload = {
-            "agent_id": valid_letta_agent_id,
-            "cron": "*/5 * * * *",
-            "message": "Test message",
-            "role": "user"
-        }
-        response = requests.post(f"{api_base_url}/schedules/recurring", json=payload, headers=headers)
-        
-        # API key should not be in response
-        data = response.json()
-        assert "api_key" not in data
-        
-        # Clean up
-        schedule_id = data["id"]
-        requests.delete(f"{api_base_url}/schedules/recurring/{schedule_id}", headers=headers)
-
-
-@pytest.mark.e2e
-class TestRecurringScheduleCRUD:
-    def test_create_recurring_schedule(self, api_base_url, valid_letta_api_key, valid_letta_agent_id):
-        """Should successfully create a recurring schedule."""
-        headers = {"Authorization": f"Bearer {valid_letta_api_key}"}
-        payload = {
-            "agent_id": valid_letta_agent_id,
-            "cron": "0 9 * * *",
-            "message": "Daily morning message",
-            "role": "user"
-        }
-        
-        response = requests.post(f"{api_base_url}/schedules/recurring", json=payload, headers=headers)
-        assert response.status_code == 201
-        
-        data = response.json()
-        assert "id" in data
-        assert data["cron"] == "0 9 * * *"
-        assert data["message"] == "Daily morning message"
-        
-        # Clean up
-        requests.delete(f"{api_base_url}/schedules/recurring/{data['id']}", headers=headers)
-    
-    def test_list_recurring_schedules(self, api_base_url, valid_letta_api_key, valid_letta_agent_id):
-        """List should only return schedules for authenticated user."""
-        headers = {"Authorization": f"Bearer {valid_letta_api_key}"}
-        
-        # Create a schedule
-        payload = {
-            "agent_id": valid_letta_agent_id,
-            "cron": "*/10 * * * *",
-            "message": "Test",
-            "role": "user"
-        }
-        create_response = requests.post(f"{api_base_url}/schedules/recurring", json=payload, headers=headers)
-        schedule_id = create_response.json()["id"]
-        
-        # List schedules
-        response = requests.get(f"{api_base_url}/schedules/recurring", headers=headers)
-        assert response.status_code == 200
-        
-        schedules = response.json()
-        assert isinstance(schedules, list)
-        
-        # Find our schedule
-        our_schedule = next((s for s in schedules if s["id"] == schedule_id), None)
-        assert our_schedule is not None
-        assert our_schedule["cron"] == "*/10 * * * *"
-        
-        # Clean up
-        requests.delete(f"{api_base_url}/schedules/recurring/{schedule_id}", headers=headers)
-    
-    def test_get_recurring_schedule(self, api_base_url, valid_letta_api_key, valid_letta_agent_id):
-        """Get should return specific schedule."""
-        headers = {"Authorization": f"Bearer {valid_letta_api_key}"}
-        
-        # Create schedule
-        payload = {
-            "agent_id": valid_letta_agent_id,
-            "cron": "0 12 * * *",
-            "message": "Noon message",
-            "role": "user"
-        }
-        create_response = requests.post(f"{api_base_url}/schedules/recurring", json=payload, headers=headers)
-        schedule_id = create_response.json()["id"]
-        
-        # Get schedule
-        response = requests.get(f"{api_base_url}/schedules/recurring/{schedule_id}", headers=headers)
-        assert response.status_code == 200
-        
-        data = response.json()
-        assert data["id"] == schedule_id
-        assert data["cron"] == "0 12 * * *"
-        
-        # Clean up
-        requests.delete(f"{api_base_url}/schedules/recurring/{schedule_id}", headers=headers)
-    
-    def test_delete_recurring_schedule(self, api_base_url, valid_letta_api_key, valid_letta_agent_id):
-        """Delete should remove schedule."""
-        headers = {"Authorization": f"Bearer {valid_letta_api_key}"}
-        
-        # Create schedule
-        payload = {
-            "agent_id": valid_letta_agent_id,
-            "cron": "0 0 * * *",
-            "message": "To be deleted",
-            "role": "user"
-        }
-        create_response = requests.post(f"{api_base_url}/schedules/recurring", json=payload, headers=headers)
-        schedule_id = create_response.json()["id"]
-        
-        # Delete schedule
-        delete_response = requests.delete(f"{api_base_url}/schedules/recurring/{schedule_id}", headers=headers)
-        assert delete_response.status_code == 200
-        
-        # Verify it's gone
-        get_response = requests.get(f"{api_base_url}/schedules/recurring/{schedule_id}", headers=headers)
-        assert get_response.status_code == 404
-    
-    def test_delete_nonexistent_returns_404(self, api_base_url, valid_letta_api_key):
-        """Deleting non-existent schedule should return 404."""
-        headers = {"Authorization": f"Bearer {valid_letta_api_key}"}
-        response = requests.delete(f"{api_base_url}/schedules/recurring/fake-uuid-123", headers=headers)
-        assert response.status_code == 404
-
-
-@pytest.mark.e2e
-class TestOneTimeScheduleCRUD:
-    def test_create_onetime_schedule(self, api_base_url, valid_letta_api_key, valid_letta_agent_id):
-        """Should successfully create a one-time schedule."""
-        headers = {"Authorization": f"Bearer {valid_letta_api_key}"}
-        future_time = datetime.now(timezone.utc) + timedelta(hours=1)
-        
-        payload = {
-            "agent_id": valid_letta_agent_id,
-            "execute_at": future_time.isoformat(),
-            "message": "Future message",
-            "role": "user"
-        }
-        
-        response = requests.post(f"{api_base_url}/schedules/one-time", json=payload, headers=headers)
-        assert response.status_code == 201
-        
-        data = response.json()
-        assert "id" in data
-        assert data["execute_at"] == future_time.isoformat()
-        
-        # Clean up
-        requests.delete(f"{api_base_url}/schedules/one-time/{data['id']}", headers=headers)
-    
-    def test_list_onetime_schedules(self, api_base_url, valid_letta_api_key, valid_letta_agent_id):
-        """List should only return user's schedules."""
-        headers = {"Authorization": f"Bearer {valid_letta_api_key}"}
-        future_time = datetime.now(timezone.utc) + timedelta(hours=2)
-        
-        payload = {
-            "agent_id": valid_letta_agent_id,
-            "execute_at": future_time.isoformat(),
-            "message": "Test",
-            "role": "user"
-        }
-        create_response = requests.post(f"{api_base_url}/schedules/one-time", json=payload, headers=headers)
-        schedule_id = create_response.json()["id"]
-        
-        # List schedules
-        response = requests.get(f"{api_base_url}/schedules/one-time", headers=headers)
-        assert response.status_code == 200
-        
-        schedules = response.json()
-        our_schedule = next((s for s in schedules if s["id"] == schedule_id), None)
-        assert our_schedule is not None
-        
-        # Clean up
-        requests.delete(f"{api_base_url}/schedules/one-time/{schedule_id}", headers=headers)
-    
-    def test_delete_onetime_schedule(self, api_base_url, valid_letta_api_key, valid_letta_agent_id):
-        """Delete should remove one-time schedule."""
-        headers = {"Authorization": f"Bearer {valid_letta_api_key}"}
-        future_time = datetime.now(timezone.utc) + timedelta(hours=3)
-        
-        payload = {
-            "agent_id": valid_letta_agent_id,
-            "execute_at": future_time.isoformat(),
-            "message": "To be deleted",
-            "role": "user"
-        }
-        create_response = requests.post(f"{api_base_url}/schedules/one-time", json=payload, headers=headers)
-        schedule_id = create_response.json()["id"]
-        
-        # Delete
-        delete_response = requests.delete(f"{api_base_url}/schedules/one-time/{schedule_id}", headers=headers)
-        assert delete_response.status_code == 200
-        
-        # Verify gone
-        get_response = requests.get(f"{api_base_url}/schedules/one-time/{schedule_id}", headers=headers)
-        assert get_response.status_code == 404
-
-
-@pytest.mark.e2e
-class TestResults:
-    def test_list_results(self, api_base_url, valid_letta_api_key):
-        """Should list execution results."""
-        headers = {"Authorization": f"Bearer {valid_letta_api_key}"}
-        response = requests.get(f"{api_base_url}/results", headers=headers)
-        assert response.status_code == 200
-        assert isinstance(response.json(), list)
-    
-    def test_get_result_nonexistent_returns_404(self, api_base_url, valid_letta_api_key):
-        """Getting non-existent result should return 404."""
-        headers = {"Authorization": f"Bearer {valid_letta_api_key}"}
-        response = requests.get(f"{api_base_url}/results/fake-uuid-123", headers=headers)
-        assert response.status_code == 404
-
-
-@pytest.mark.e2e
-@pytest.mark.slow
-class TestExecution:
-    def test_past_onetime_executes_immediately(self, api_base_url, valid_letta_api_key, valid_letta_agent_id):
-        """One-time schedule in the past should execute within 1 minute."""
-        headers = {"Authorization": f"Bearer {valid_letta_api_key}"}
-        past_time = datetime.now(timezone.utc) - timedelta(seconds=30)
-        
-        payload = {
-            "agent_id": valid_letta_agent_id,
-            "execute_at": past_time.isoformat(),
-            "message": "Should execute immediately",
-            "role": "user"
-        }
-        
-        create_response = requests.post(f"{api_base_url}/schedules/one-time", json=payload, headers=headers)
-        schedule_id = create_response.json()["id"]
-        
-        # Wait up to 90 seconds for execution
-        max_wait = 90
-        for i in range(max_wait):
-            time.sleep(1)
-            
-            # Check if result exists
-            result_response = requests.get(f"{api_base_url}/results/{schedule_id}", headers=headers)
-            if result_response.status_code == 200:
-                result = result_response.json()
-                assert "run_id" in result
-                assert result["schedule_type"] == "one-time"
-                print(f"✓ Executed in {i+1} seconds, run_id: {result['run_id']}")
-                return
-        
-        pytest.fail(f"Schedule did not execute within {max_wait} seconds")
-    
-    def test_onetime_schedule_deleted_after_execution(self, api_base_url, valid_letta_api_key, valid_letta_agent_id):
-        """One-time schedule should be deleted from filesystem after execution."""
-        headers = {"Authorization": f"Bearer {valid_letta_api_key}"}
-        past_time = datetime.now(timezone.utc) - timedelta(seconds=30)
-        
-        payload = {
-            "agent_id": valid_letta_agent_id,
-            "execute_at": past_time.isoformat(),
-            "message": "Should be deleted after execution",
-            "role": "user"
-        }
-        
-        create_response = requests.post(f"{api_base_url}/schedules/one-time", json=payload, headers=headers)
-        schedule_id = create_response.json()["id"]
-        
-        # Wait for execution
-        for _ in range(90):
-            time.sleep(1)
-            result_response = requests.get(f"{api_base_url}/results/{schedule_id}", headers=headers)
-            if result_response.status_code == 200:
-                break
-        
-        # Schedule should be deleted
-        schedule_response = requests.get(f"{api_base_url}/schedules/one-time/{schedule_id}", headers=headers)
-        assert schedule_response.status_code == 404
-    
-    def test_result_persists_after_schedule_deletion(self, api_base_url, valid_letta_api_key, valid_letta_agent_id):
-        """Execution result should persist even after schedule is deleted."""
-        headers = {"Authorization": f"Bearer {valid_letta_api_key}"}
-        past_time = datetime.now(timezone.utc) - timedelta(seconds=30)
-        
-        payload = {
-            "agent_id": valid_letta_agent_id,
-            "execute_at": past_time.isoformat(),
-            "message": "Test result persistence",
-            "role": "user"
-        }
-        
-        create_response = requests.post(f"{api_base_url}/schedules/one-time", json=payload, headers=headers)
-        schedule_id = create_response.json()["id"]
-        
-        # Wait for execution
-        for _ in range(90):
-            time.sleep(1)
-            result_response = requests.get(f"{api_base_url}/results/{schedule_id}", headers=headers)
-            if result_response.status_code == 200:
-                result = result_response.json()
-                break
-        
-        # Result should still exist
-        final_result = requests.get(f"{api_base_url}/results/{schedule_id}", headers=headers)
-        assert final_result.status_code == 200
-        assert "run_id" in final_result.json()
-    
-    def test_no_duplicate_execution(self, api_base_url, valid_letta_api_key, valid_letta_agent_id):
-        """One-time schedule should execute exactly once, even if multiple executors spawn."""
-        headers = {"Authorization": f"Bearer {valid_letta_api_key}"}
-        past_time = datetime.now(timezone.utc) - timedelta(seconds=30)
-        
-        payload = {
-            "agent_id": valid_letta_agent_id,
-            "execute_at": past_time.isoformat(),
-            "message": "Should only execute once",
-            "role": "user"
-        }
-        
-        create_response = requests.post(f"{api_base_url}/schedules/one-time", json=payload, headers=headers)
-        schedule_id = create_response.json()["id"]
-        
-        # Wait for execution
-        time.sleep(90)
-        
-        # Check result
-        result_response = requests.get(f"{api_base_url}/results/{schedule_id}", headers=headers)
-        assert result_response.status_code == 200
-        
-        # There should be exactly one result file, indicating one execution
-        # (We can't directly verify this without filesystem access, but we can check result exists)
-        result = result_response.json()
-        assert result["schedule_id"] == schedule_id
-        assert "run_id" in result
-
-
-@pytest.mark.e2e
-class TestAuthorization:
-    def test_user_isolation_list(self, api_base_url, valid_letta_api_key):
-        """Users should only see their own schedules in list."""
-        # This test requires a second valid API key
-        # Skip if not available
-        second_key = os.getenv("LETTA_API_KEY_2")
-        if not second_key:
-            pytest.skip("LETTA_API_KEY_2 not set for multi-user testing")
-        
-        headers1 = {"Authorization": f"Bearer {valid_letta_api_key}"}
-        headers2 = {"Authorization": f"Bearer {second_key}"}
-        
-        # User 1's list should not contain User 2's schedules
-        response1 = requests.get(f"{api_base_url}/schedules/recurring", headers=headers1)
-        response2 = requests.get(f"{api_base_url}/schedules/recurring", headers=headers2)
-        
-        schedules1 = response1.json()
-        schedules2 = response2.json()
-        
-        # Lists should be independent (no overlap)
-        ids1 = {s["id"] for s in schedules1}
-        ids2 = {s["id"] for s in schedules2}
-        
-        assert ids1.isdisjoint(ids2), "Users should not see each other's schedules"

+ 0 - 133
tests/test_crypto_utils.py

@@ -1,133 +0,0 @@
-import pytest
-import os
-from cryptography.fernet import Fernet
-from crypto_utils import (
-    get_api_key_hash,
-    is_dev_mode,
-    get_encryption_key,
-    encrypt_json,
-    decrypt_json,
-)
-
-
-class TestApiKeyHash:
-    def test_hash_is_deterministic(self):
-        """Same API key should produce same hash."""
-        api_key = "test-key-123"
-        hash1 = get_api_key_hash(api_key)
-        hash2 = get_api_key_hash(api_key)
-        assert hash1 == hash2
-    
-    def test_hash_is_16_chars(self):
-        """Hash should be 16 characters long."""
-        api_key = "test-key-123"
-        hash_val = get_api_key_hash(api_key)
-        assert len(hash_val) == 16
-    
-    def test_different_keys_different_hashes(self):
-        """Different API keys should produce different hashes."""
-        hash1 = get_api_key_hash("key1")
-        hash2 = get_api_key_hash("key2")
-        assert hash1 != hash2
-
-
-class TestDevMode:
-    def test_dev_mode_true(self):
-        """Dev mode should be enabled when env var is 'true'."""
-        os.environ["LETTA_SWITCHBOARD_DEV_MODE"] = "true"
-        assert is_dev_mode() is True
-    
-    def test_dev_mode_1(self):
-        """Dev mode should be enabled when env var is '1'."""
-        os.environ["LETTA_SWITCHBOARD_DEV_MODE"] = "1"
-        assert is_dev_mode() is True
-    
-    def test_dev_mode_yes(self):
-        """Dev mode should be enabled when env var is 'yes'."""
-        os.environ["LETTA_SWITCHBOARD_DEV_MODE"] = "yes"
-        assert is_dev_mode() is True
-    
-    def test_dev_mode_false(self):
-        """Dev mode should be disabled when env var is 'false'."""
-        os.environ["LETTA_SWITCHBOARD_DEV_MODE"] = "false"
-        assert is_dev_mode() is False
-    
-    def test_dev_mode_unset(self):
-        """Dev mode should be disabled when env var is not set."""
-        if "LETTA_SWITCHBOARD_DEV_MODE" in os.environ:
-            del os.environ["LETTA_SWITCHBOARD_DEV_MODE"]
-        assert is_dev_mode() is False
-
-
-class TestEncryption:
-    def test_encrypt_decrypt_roundtrip(self, encryption_key):
-        """Data should survive encryption/decryption roundtrip."""
-        original_data = {
-            "id": "test-123",
-            "message": "Hello world",
-            "nested": {"key": "value"}
-        }
-        
-        encrypted = encrypt_json(original_data, encryption_key)
-        decrypted = decrypt_json(encrypted, encryption_key)
-        
-        assert decrypted == original_data
-    
-    def test_encrypted_data_is_bytes(self, encryption_key):
-        """Encrypted data should be bytes."""
-        data = {"test": "data"}
-        encrypted = encrypt_json(data, encryption_key)
-        assert isinstance(encrypted, bytes)
-    
-    def test_dev_mode_plaintext(self):
-        """In dev mode, data should be plaintext JSON."""
-        os.environ["LETTA_SWITCHBOARD_DEV_MODE"] = "true"
-        
-        data = {"test": "data", "number": 123}
-        key = b"ignored-in-dev-mode"
-        
-        encrypted = encrypt_json(data, key)
-        assert isinstance(encrypted, bytes)
-        
-        # Should be valid JSON
-        import json
-        parsed = json.loads(encrypted)
-        assert parsed == data
-    
-    def test_dev_mode_decrypt(self):
-        """In dev mode, decrypt should parse plaintext JSON."""
-        os.environ["LETTA_SWITCHBOARD_DEV_MODE"] = "true"
-        
-        data = {"test": "data"}
-        key = b"ignored"
-        
-        encrypted = encrypt_json(data, key)
-        decrypted = decrypt_json(encrypted, key)
-        
-        assert decrypted == data
-    
-    def test_production_mode_encrypted(self):
-        """In production mode, data should be encrypted (not plaintext)."""
-        os.environ["LETTA_SWITCHBOARD_DEV_MODE"] = "false"
-        
-        data = {"test": "secret"}
-        key = Fernet.generate_key()
-        
-        encrypted = encrypt_json(data, key)
-        
-        # Should NOT be valid JSON
-        import json
-        with pytest.raises(json.JSONDecodeError):
-            json.loads(encrypted)
-    
-    def test_wrong_key_fails(self, encryption_key):
-        """Decrypting with wrong key should fail."""
-        os.environ["LETTA_SWITCHBOARD_DEV_MODE"] = "false"
-        
-        data = {"test": "data"}
-        encrypted = encrypt_json(data, encryption_key)
-        
-        wrong_key = Fernet.generate_key()
-        
-        with pytest.raises(Exception):
-            decrypt_json(encrypted, wrong_key)

+ 0 - 123
tests/test_scheduler.py

@@ -1,123 +0,0 @@
-import pytest
-from datetime import datetime, timezone, timedelta
-from scheduler import is_recurring_schedule_due, is_onetime_schedule_due
-
-
-class TestRecurringScheduleDue:
-    def test_new_schedule_is_due(self):
-        """A newly created schedule with no last_run should be due."""
-        schedule = {
-            "cron": "* * * * *",
-            "created_at": (datetime.now(timezone.utc) - timedelta(minutes=2)).isoformat(),
-        }
-        current_time = datetime.now(timezone.utc)
-        
-        assert is_recurring_schedule_due(schedule, current_time) is True
-    
-    def test_just_ran_not_due(self):
-        """A schedule that just ran should not be due yet."""
-        schedule = {
-            "cron": "*/5 * * * *",
-            "created_at": (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat(),
-            "last_run": datetime.now(timezone.utc).isoformat(),
-        }
-        current_time = datetime.now(timezone.utc)
-        
-        assert is_recurring_schedule_due(schedule, current_time) is False
-    
-    def test_five_minutes_ago_is_due(self):
-        """A schedule with last_run 5+ minutes ago should be due (cron every 5 min)."""
-        schedule = {
-            "cron": "*/5 * * * *",
-            "created_at": (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat(),
-            "last_run": (datetime.now(timezone.utc) - timedelta(minutes=6)).isoformat(),
-        }
-        current_time = datetime.now(timezone.utc)
-        
-        assert is_recurring_schedule_due(schedule, current_time) is True
-    
-    def test_daily_cron_not_due(self):
-        """Daily schedule that ran today should not be due."""
-        schedule = {
-            "cron": "0 9 * * *",  # Every day at 9am
-            "created_at": (datetime.now(timezone.utc) - timedelta(days=2)).isoformat(),
-            "last_run": datetime.now(timezone.utc).isoformat(),
-        }
-        current_time = datetime.now(timezone.utc)
-        
-        assert is_recurring_schedule_due(schedule, current_time) is False
-    
-    def test_timezone_aware_comparison(self):
-        """Timezone-aware datetimes should be handled correctly."""
-        schedule = {
-            "cron": "*/5 * * * *",
-            "created_at": datetime.now(timezone.utc).isoformat(),
-        }
-        current_time = datetime.now(timezone.utc)
-        
-        # Should not raise timezone comparison errors
-        is_recurring_schedule_due(schedule, current_time)
-
-
-class TestOneTimeScheduleDue:
-    def test_past_schedule_is_due(self):
-        """Schedule in the past should be due for execution."""
-        past_time = datetime.now(timezone.utc) - timedelta(minutes=5)
-        schedule = {
-            "execute_at": past_time.isoformat(),
-            "executed": False,
-        }
-        current_time = datetime.now(timezone.utc)
-        
-        assert is_onetime_schedule_due(schedule, current_time) is True
-    
-    def test_future_schedule_not_due(self):
-        """Schedule in the future should not be due."""
-        future_time = datetime.now(timezone.utc) + timedelta(minutes=5)
-        schedule = {
-            "execute_at": future_time.isoformat(),
-        }
-        current_time = datetime.now(timezone.utc)
-        
-        assert is_onetime_schedule_due(schedule, current_time) is False
-    
-    def test_executed_schedule_not_due(self):
-        """Already executed schedule should not be due (legacy check)."""
-        past_time = datetime.now(timezone.utc) - timedelta(minutes=5)
-        schedule = {
-            "execute_at": past_time.isoformat(),
-            "executed": True,
-        }
-        current_time = datetime.now(timezone.utc)
-        
-        assert is_onetime_schedule_due(schedule, current_time) is False
-    
-    def test_timezone_est(self):
-        """Schedule with EST timezone should be handled correctly."""
-        # 2 hours ago EST
-        past_time = datetime.now(timezone.utc) - timedelta(hours=2)
-        schedule = {
-            "execute_at": past_time.isoformat(),
-        }
-        current_time = datetime.now(timezone.utc)
-        
-        assert is_onetime_schedule_due(schedule, current_time) is True
-    
-    def test_timezone_naive_defaults_utc(self):
-        """Timezone-naive timestamps should be treated as UTC."""
-        past_time = datetime.now(timezone.utc) - timedelta(minutes=5)
-        schedule = {
-            "execute_at": past_time.replace(tzinfo=None).isoformat(),
-        }
-        current_time = datetime.now(timezone.utc)
-        
-        assert is_onetime_schedule_due(schedule, current_time) is True
-    
-    def test_exact_time_is_due(self):
-        """Schedule at exactly current time should be due."""
-        current_time = datetime.now(timezone.utc)
-        schedule = {
-            "execute_at": current_time.isoformat(),
-        }
-        
-        assert is_onetime_schedule_due(schedule, current_time) is True