scheduler.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. from croniter import croniter
  2. from datetime import datetime, timezone
  3. from dateutil import parser
  4. from zoneinfo import ZoneInfo
  5. import logging
  6. logger = logging.getLogger(__name__)
  7. def is_recurring_schedule_due(schedule_dict: dict, current_time: datetime) -> bool:
  8. cron_expression = schedule_dict["cron"]
  9. last_run = schedule_dict.get("last_run")
  10. schedule_tz_str = schedule_dict.get("timezone", "UTC")
  11. # Get the schedule's timezone
  12. try:
  13. schedule_tz = ZoneInfo(schedule_tz_str)
  14. except Exception:
  15. schedule_tz = ZoneInfo("UTC")
  16. if last_run:
  17. last_run_dt = parser.parse(last_run)
  18. else:
  19. last_run_dt = parser.parse(schedule_dict["created_at"])
  20. if last_run_dt.tzinfo is None:
  21. last_run_dt = last_run_dt.replace(tzinfo=timezone.utc)
  22. if current_time.tzinfo is None:
  23. current_time = current_time.replace(tzinfo=timezone.utc)
  24. # Convert current time to schedule's timezone for cron evaluation
  25. current_time_in_tz = current_time.astimezone(schedule_tz)
  26. last_run_in_tz = last_run_dt.astimezone(schedule_tz)
  27. cron = croniter(cron_expression, last_run_in_tz)
  28. next_run = cron.get_next(datetime)
  29. return current_time_in_tz >= next_run
  30. def is_onetime_schedule_due(schedule_dict: dict, current_time: datetime) -> bool:
  31. if schedule_dict.get("executed", False):
  32. return False
  33. execute_at_str = schedule_dict["execute_at"]
  34. execute_at = parser.parse(execute_at_str)
  35. if execute_at.tzinfo is None:
  36. execute_at = execute_at.replace(tzinfo=timezone.utc)
  37. if current_time.tzinfo is None:
  38. current_time = current_time.replace(tzinfo=timezone.utc)
  39. return current_time >= execute_at