The first way
add sample code like below in the app_name/tasks.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
from celery.schedules import crontab from mysite import celery_app
@celery_app.on_after_finalize.connect def (sender, **kwargs):
sender.add_periodic_task( schedule=crontab(minute="*/1"), sig=debug_periodic_task.s("[Debug periodic task]"), args=("[Debug periodic task]",) name="debug periodic task", )
@celery_app.task def debug_periodic_task(args): print(args)
|
The second way
步骤一,创建像下面的代码,放入到app_name/tasks.py:
1 2 3 4 5 6
|
from celery import shared_task
@shared_task def debug_periodic_task(args): print(args)
|
步骤二,在proj/settings.py文件中添加
1 2 3 4 5 6 7 8 9
|
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = { "debug-periodic-task": { "task": "app_name.tasks.debug_periodic_task", "schedule": crontab(minute="*/1"), "args": ("[Periodic task]",), }, }
|
近期评论