¿Cancelar una tarea que ya se está ejecutando con Celery?

96

He estado leyendo el documento y buscando, pero parece que no puedo encontrar una respuesta directa:

¿Puede cancelar una tarea que ya se está ejecutando? (ya que la tarea ha comenzado, lleva un tiempo y la mitad debe cancelarse)

Encontré esto en el documento en Preguntas frecuentes sobre apio

>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()

Pero no tengo claro si esto cancelará las tareas en cola o si matará un proceso en ejecución en un trabajador. ¡Gracias por cualquier luz que puedas arrojar!

dcoffey3296
fuente

Respuestas:

185

revocar cancela la ejecución de la tarea. Si se revoca una tarea, los trabajadores ignoran la tarea y no la ejecutan. Si no usa revocaciones persistentes, su tarea se puede ejecutar después del reinicio del trabajador.

http://docs.celeryproject.org/en/latest/userguide/workers.html#worker-persistent-revokes

revoke tiene una opción de terminación que es falsa por defecto. Si necesita eliminar la tarea en ejecución, debe establecer terminar en True .

>>> from celery.task.control import revoke
>>> revoke(task_id, terminate=True)

http://docs.celeryproject.org/en/latest/userguide/workers.html#revoke-revoking-tasks

mher
fuente
3
Esta es exactamente la explicación que estaba buscando, ¡gracias!
dcoffey3296
1
¿Funciona esto en un entorno distribuido? Quiero decir si tengo trabajadores en varias máquinas que están ejecutando tareas. ¿El apio realiza un seguimiento de en qué máquina se está ejecutando la tarea?
ksrini
1
Lo hace. La comunicación con los trabajadores se realiza a través del corredor.
mher
5
result.revoke (terminate = True) debería hacer lo mismo que revoke (task_id, terminate = True)
CamHart
10
Además, el uso de la opción terminar es "un último recurso para los administradores", según los documentos recientes de Celery. Corre el riesgo de finalizar otra tarea que haya comenzado recientemente en ese trabajador.
kouk
38

En Celery 3.1, se cambia la API de revocación de tareas .

De acuerdo con las preguntas frecuentes sobre apio , debe usar result.revoke:

>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()

o si solo tiene la identificación de la tarea:

>>> from proj.celery import app
>>> app.control.revoke(task_id)
Rockallita
fuente
25

La respuesta de @ 0x00mh es correcta, sin embargo, los documentos recientes de apio dicen que usar la terminateopción es " un último recurso para los administradores " porque puede terminar accidentalmente otra tarea que comenzó a ejecutarse mientras tanto. Posiblemente una mejor solución es combinar terminate=Truecon signal='SIGUSR1'(lo que hace que se genere la excepción SoftTimeLimitExceeded en la tarea).

kouk
fuente
2
Esta solución funcionó muy bien para mí. Cuando SoftTimeLimitExceededse genera en mi tarea, se invoca mi lógica de limpieza personalizada (implementada a través de try/ except/ finally). Esto es mucho mejor, en mi opinión, que lo que AbortableTaskofrece ( docs.celeryproject.org/en/latest/reference/… ). Con este último, necesita un backend de resultados de base de datos y debe verificar manual y repetidamente el estado de una tarea en curso para ver si ha sido cancelada.
David Schneider
3
¿Cómo es esto mejor? Según tengo entendido, si hay alguna otra tarea recogida por el proceso, se detendrá de todos modos, solo se lanzará una excepción diferente.
marxin
Si uso, worker_prefetch_multiplier = 1ya que solo tengo algunas tareas de ejecución prolongada, la terminación debería estar bien, ya que no se realizarán otras tareas al terminar, ¿entendí esto correctamente? @spicyramen
maffe
1

Consulte las siguientes opciones para las tareas: time_limit , soft_time_limit (o puede configurarlo para los trabajadores). Si desea controlar no solo el tiempo de ejecución, consulte el argumento expires del método apply_async.

simplemente lizz
fuente