Scheduling RQ tasks with RQ scheduler¶
RqScheduler
is a lightweight alternative to Celery-beat
, it does only one thing : Schedule recurring jobs
RqScheduler documentation : https://github.com/rq/rq-scheduler
The RqScheduler daemon will tik every second and add jobs that require to be executed to the RQ job queue :
python manage.py rqscheduler --interval 1
But contrary to celery-beat
, where we define the jobs and intervals in a dictionary, RqScheduler
jobs need
to be scheduled programmatically at application startup.
We could schedule our jobs in a Django Application AppConfig
file, such as :
&> cat project/apps/my_app/apps.py
class MyAppConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "project.apps.myapp"
def ready(self):
scheduler.schedule(
scheduled_time=timezone.now(),
func=my_job,
interval=60,
repeat=None,
)
This could work in development, but it will create problems in production.
Several django instances will be running inside uwsgi/gunicorn
workers and schedule the tasks multiple times
There is an elegant and easy solution to ensure jobs are scheduled in a single entrypoint : override RqScheduler
django command and schedule our jobs there.
&> cat project/apps/my_app/management/commands/rqscheduler.py
import django_rq
from django.utils import timezone
from django_rq.management.commands import rqscheduler
scheduler = django_rq.get_scheduler("default") # use the default queue of Django-RQ
def clear_scheduled_jobs():
# Delete any existing jobs in the scheduler when the app starts up
for job in scheduler.get_jobs():
log.debug("Deleting scheduled job %s", job)
job.delete()
def register_scheduled_jobs():
# do your scheduling here
scheduler.schedule(
scheduled_time=timezone.now(),
func=my_job,
interval=60,
repeat=None,
)
class Command(rqscheduler.Command):
"""
Source : https://github.com/rq/rq-scheduler/issues/51#issuecomment-362352497
"""
help = "Run RqScheduler and schedule jobs"
def handle(self, *args, **kwargs):
# This is necessary to prevent duplicates
clear_scheduled_jobs()
register_scheduled_jobs()
super().handle(*args, **kwargs)
Conclusion¶
With this approach we will run the scheduler and automatically schedule all required jobs !