Eliminar todas las tareas pendientes en apio / rabbitmq

Respuestas:

296

De los documentos :

$ celery -A proj purge

o

from proj.celery import app
app.control.purge()

(EDITAR: actualizado con el método actual).

Philip Southam
fuente
56
O, desde Django, para el apio 3.0+: manage.py celery purge( celeryctlahora está en desuso y desaparecerá en 3.1).
Henrik Heimbuerger
3
Encontré esta respuesta buscando cómo hacer esto con un backend de redis. El mejor método que encontré fue el redis-cli KEYS "celery*" | xargs redis-cli DELque funcionó para mí. Esto eliminará todas las tareas almacenadas en el backend de redis que está utilizando.
Melignus
1
¿Cómo puedo hacer esto en apio 3.0?
luistm
2
Para mí, fue simplemente celery purge(dentro del entorno virtual relevante). Ooops: hay una respuesta con la misma a continuación ..... stackoverflow.com/a/20404976/1213425
Erve1879
Para Celery 4.0+ en combinación con Django, nuevamente es este comando, donde el argumento -Aes la aplicación Django donde celery.pyse encuentra.
gitaarik
120

Para el apio 3.0+:

$ celery purge

Para purgar una cola específica:

$ celery -Q queue_name purge
ToonAlfrink
fuente
9
Si obtiene errores de conexión, asegúrese de especificar la aplicación, por ejemplo celery -A proj purge.
Kamil Sindi
25

Para apio 2.xy 3.x:

Cuando se utiliza el trabajador con el parámetro -Q para definir colas, por ejemplo

celery worker -Q queue1,queue2,queue3

entonces celery purgeno funcionará, porque no puede pasarle los parámetros de la cola. Solo eliminará la cola predeterminada. La solución es comenzar a sus trabajadores con --purgeparámetros como este:

celery worker -Q queue1,queue2,queue3 --purge

Sin embargo, esto ejecutará al trabajador.

Otra opción es usar el subcomando amqp de apio

celery amqp queue.delete queue1
celery amqp queue.delete queue2
celery amqp queue.delete queue3
smido
fuente
Sí, esto es para versiones anteriores (2.xy quizás 3.x) de apio. No puedo editar la respuesta
smido
9

Descubrí que celery purgeno funciona para mi configuración de apio más compleja. Utilizo múltiples colas con nombre para diferentes propósitos:

$ sudo rabbitmqctl list_queues -p celery name messages consumers
Listing queues ...  # Output sorted, whitespaced for readability
celery                                          0   2
[email protected]                      0   1
[email protected]                      0   1
apns                                            0   1
[email protected]                        0   1
analytics                                       1   1
[email protected]                   0   1
bcast.361093f1-de68-46c5-adff-d49ea8f164c0      0   1
bcast.a53632b0-c8b8-46d9-bd59-364afe9998c1      0   1
celeryev.c27b070d-b07e-4e37-9dca-dbb45d03fd54   0   1
celeryev.c66a9bed-84bd-40b0-8fe7-4e4d0c002866   0   1
celeryev.b490f71a-be1a-4cd8-ae17-06a713cc2a99   0   1
celeryev.9d023165-ab4a-42cb-86f8-90294b80bd1e   0   1

La primera columna es el nombre de la cola, la segunda es la cantidad de mensajes que esperan en la cola y la tercera es la cantidad de oyentes para esa cola. Las colas son:

  • apio - Cola para tareas de apio estándar e idempotentes
  • apns - Cola para tareas de Apple Push Notification Service, no tan idempotente
  • analytics - Cola para análisis nocturnos de larga duración
  • * .pidbox - Cola para comandos de trabajo, como apagado y restablecimiento, uno por trabajador (2 trabajadores de apio, un trabajador de apns, un trabajador de análisis)
  • bcast. * - Difundir colas, para enviar mensajes a todos los trabajadores que escuchan una cola (en lugar de solo el primero en tomarla)
  • celeryev. * - Colas de eventos de apio, para informes de análisis de tareas

La tarea de análisis es una tarea de fuerza bruta que funcionó muy bien en pequeños conjuntos de datos, pero ahora tarda más de 24 horas en procesarse. Ocasionalmente, algo saldrá mal y se quedará atascado esperando en la base de datos. Debe reescribirse, pero hasta entonces, cuando se atasca, finalizo la tarea, vacío la cola e intento nuevamente. Detecto "estancamiento" mirando el recuento de mensajes para la cola de análisis, que debería ser 0 (análisis finalizado) o 1 (esperando que finalicen los análisis de la noche anterior). 2 o superior es malo, y recibo un correo electrónico.

celery purge ofrece borrar tareas de una de las colas de transmisión, y no veo una opción para elegir una cola con nombre diferente.

Aquí está mi proceso:

$ sudo /etc/init.d/celeryd stop  # Wait for analytics task to be last one, Ctrl-C
$ ps -ef | grep analytics  # Get the PID of the worker, not the root PID reported by celery
$ sudo kill <PID>
$ sudo /etc/init.d/celeryd stop  # Confim dead
$ python manage.py celery amqp queue.purge analytics
$ sudo rabbitmqctl list_queues -p celery name messages consumers  # Confirm messages is 0
$ sudo /etc/init.d/celeryd start
jwhitlock
fuente
Sin embargo, no es una respuesta, ¿verdad? Muy informativo sin embargo!
amn
44
celeryctl purgeno funcionó con colas con nombre. python manage.py celery amqp queue.purge <queue_name>hizo. Creo que el contexto es útil para aquellos con configuraciones complejas, por lo que pueden averiguar qué deben hacer si celeryctl purgefalla para ellos.
jwhitlock
No puedo encontrar manage.pyen mi Celery 3.1.17, ¿se ha eliminado el archivo o simplemente es nuevo? Sin embargo, encontré lo que parece la interfaz correspondiente ( queue.purge) */bin/amqp.py. Pero después de tratar de correlacionar el contenido del archivo con la documentación, debo admitir lamentablemente que Celery está lamentablemente indocumentado y también es un trabajo muy complicado, al menos juzgándolo por su código fuente.
amn
manage.pyes el script de administración de Django y manage.py celeryejecuta apio después de cargar la configuración desde la configuración de Django. No he usado apio fuera de Django, pero el celerycomando incluido puede ser lo que está buscando: celery.readthedocs.org/en/latest/userguide/monitoring.html
jwhitlock
5

En apio 3+

http://docs.celeryproject.org/en/3.1/faq.html#how-do-i-purge-all-waiting-tasks

CLI

Purgue la cola con nombre:

 celery -A proj amqp queue.purge <queue name>

Purgar la cola configurada

celery -A proj purge

¿He purgado mensajes, pero todavía quedan mensajes en la cola? Respuesta: Las tareas se reconocen (se eliminan de la cola) tan pronto como se ejecutan realmente. Después de que el trabajador haya recibido una tarea, llevará un tiempo hasta que se ejecute realmente, especialmente si hay muchas tareas que ya esperan su ejecución. El trabajador conserva los mensajes que no se reconocen hasta que cierra la conexión con el intermediario (servidor AMQP). Cuando se cierra esa conexión (por ejemplo, porque el trabajador se detuvo), el intermediario reenviará las tareas al siguiente trabajador disponible (o al mismo trabajador cuando se haya reiniciado), para purgar adecuadamente la cola de tareas en espera. tiene que detener a todos los trabajadores y luego purgar las tareas usando celery.control.purge ().

Por lo tanto, para purgar toda la cola, los trabajadores deben ser detenidos.

oneklc
fuente
5

Si desea eliminar todas las tareas pendientes y también las activas y reservadas para detener completamente Celery, esto es lo que funcionó para mí:

from proj.celery import app
from celery.task.control import inspect, revoke

# remove pending tasks
app.control.purge()

# remove active tasks
i = inspect()
jobs = i.active()
for hostname in jobs:
    tasks = jobs[hostname]
    for task in tasks:
        revoke(task['id'], terminate=True)

# remove reserved tasks
jobs = i.reserved()
for hostname in jobs:
    tasks = jobs[hostname]
    for task in tasks:
        revoke(task['id'], terminate=True)
kahlo
fuente
2

1. Para purgar adecuadamente la cola de tareas en espera, debe detener a todos los trabajadores ( http://celery.readthedocs.io/en/latest/faq.html#i-ve-purged-messages-but-there-are- still-messages-left-in-the-queue ):

$ sudo rabbitmqctl stop

o (en caso de que RabbitMQ / Message Broker sea administrado por Supervisor):

$ sudo supervisorctl stop all

2. ... y luego purga las tareas de una cola específica:

$ cd <source_dir>
$ celery amqp queue.purge <queue name>

3. Inicie RabbitMQ:

$ sudo rabbitmqctl start

o (en caso de que RabbitMQ sea administrado por Supervisor):

$ sudo supervisorctl start all
Ukr
fuente
2

celery 4+ comando de purga de apio para purgar todas las colas de tareas configuradas

celery -A *APPNAME* purge

programáticamente:

from proj.celery import app
app.control.purge()

toda la tarea pendiente será purgada. Referencia: celerydoc

Roshan Bagdiya
fuente