scheduler.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. from croniter import croniter
  2. from datetime import datetime, timezone
  3. from dateutil import parser
  4. import logging
  5. logger = logging.getLogger(__name__)
  6. def is_recurring_schedule_due(schedule_dict: dict, current_time: datetime) -> bool:
  7. cron_expression = schedule_dict["cron"]
  8. last_run = schedule_dict.get("last_run")
  9. if last_run:
  10. last_run_dt = parser.parse(last_run)
  11. else:
  12. last_run_dt = parser.parse(schedule_dict["created_at"])
  13. if last_run_dt.tzinfo is None:
  14. last_run_dt = last_run_dt.replace(tzinfo=timezone.utc)
  15. if current_time.tzinfo is None:
  16. current_time = current_time.replace(tzinfo=timezone.utc)
  17. cron = croniter(cron_expression, last_run_dt)
  18. next_run = cron.get_next(datetime)
  19. if next_run.tzinfo is None:
  20. next_run = next_run.replace(tzinfo=timezone.utc)
  21. return current_time >= next_run
  22. def is_onetime_schedule_due(schedule_dict: dict, current_time: datetime) -> bool:
  23. if schedule_dict.get("executed", False):
  24. return False
  25. execute_at_str = schedule_dict["execute_at"]
  26. execute_at = parser.parse(execute_at_str)
  27. if execute_at.tzinfo is None:
  28. execute_at = execute_at.replace(tzinfo=timezone.utc)
  29. if current_time.tzinfo is None:
  30. current_time = current_time.replace(tzinfo=timezone.utc)
  31. return current_time >= execute_at