from celery.app.control importInspect# Inspect all nodes.
i =Inspect()# Show the items that have an ETA or are scheduled for later processing
i.scheduled()# Show tasks that are currently active.
i.active()# Show tasks that have been claimed by workers
i.reserved()
Lo intenté, pero es realmente lento (como 1 segundo). Lo estoy usando sincrónicamente en una aplicación de tornado para monitorear el progreso, por lo que tiene que ser rápido.
JulienFr
41
Esto no devolverá una lista de tareas en la cola que aún no se han procesado.
Ed J
9
Utilícelo i.reserved()para obtener una lista de tareas en cola.
Banana
44
¿Alguien ha experimentado que i.reserved () no tendrá una lista precisa de tareas activas? Tengo tareas en ejecución que no aparecen en la lista. Estoy en django-celery == 3.1.10
Seperman
66
Al especificar el trabajador tuviera que utilizar una lista como argumento: inspect(['celery@Flatty']). Enorme mejora de velocidad inspect().
Adversus
42
si está usando rabbitMQ, use esto en la terminal:
sudo rabbitmqctl list_queues
imprimirá una lista de colas con varias tareas pendientes. por ejemplo:
Estoy familiarizado con esto cuando tengo privilegios de sudo, pero quiero que un usuario del sistema sin privilegios pueda verificar. ¿Alguna sugerencia?
sabio
Además, puede canalizar esto grep -e "^celery\s" | cut -f2para extraer eso 166si desea procesar ese número más tarde, por ejemplo, para las estadísticas.
jamesc
22
Si no usa tareas priorizadas, esto es bastante simple si está usando Redis. Para que la tarea cuente:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME
Pero, las tareas priorizadas usan una clave diferente en redis , por lo que la imagen completa es un poco más complicada. La imagen completa es que necesita consultar redis para cada prioridad de la tarea. En python (y del proyecto Flower), esto se ve así:
PRIORITY_SEP ='\x06\x16'
DEFAULT_PRIORITY_STEPS =[0,3,6,9]def make_queue_name_for_pri(queue, pri):"""Make a queue name for redis
Celery uses PRIORITY_SEP to separate different priorities of tasks into
different queues in Redis. Each queue-priority combination becomes a key in
redis with names like:
- batch1\x06\x163 <-- P3 queue named batch1
There's more information about this in Github, but it doesn't look like it
will change any time soon:
- https://github.com/celery/kombu/issues/422
In that ticket the code below, from the Flower project, is referenced:
- https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135
:param queue: The name of the queue to make a name for.
:param pri: The priority to make a name with.
:return: A name for the queue-priority pair.
"""if pri notin DEFAULT_PRIORITY_STEPS:raiseValueError('Priority not in priority steps')return'{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri)if pri else(queue,'','')))def get_queue_length(queue_name='celery'):"""Get the number of tasks in a celery queue.
:param queue_name: The name of the queue you want to inspect.
:return: the number of items in the queue.
"""
priority_names =[make_queue_name_for_pri(queue_name, pri)for pri in
DEFAULT_PRIORITY_STEPS]
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],)return sum([r.llen(x)for x in priority_names])
Si desea obtener una tarea real, puede usar algo como:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0-1
A partir de ahí, deberá deserializar la lista devuelta. En mi caso pude lograr esto con algo como:
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],)
l = r.lrange('celery',0,-1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))
Solo tenga en cuenta que la deserialización puede tomar un momento, y deberá ajustar los comandos anteriores para trabajar con varias prioridades.
He actualizado lo anterior para manejar tareas priorizadas. ¡Progreso!
mlissner
1
Solo para deletrear las cosas, el DATABASE_NUMBERusado por defecto es 0, y el QUEUE_NAMEes celery, entonces redis-cli -n 0 llen celerydevolverá el número de mensajes en cola.
Vineet Bansal
Para mi apio, el nombre de la cola es en '{{{0}}}{1}{2}'lugar de '{0}{1}{2}'. Aparte de eso, ¡esto funciona perfectamente!
Si está utilizando Celery + Django, la forma más sencilla de inspeccionar tareas utilizando comandos directamente desde su terminal en su entorno virtual o utilizando una ruta completa al apio:
Si usted tiene un proyecto definen, puede utilizarcelery -A my_proj inspect reserved
sashaboulouds
6
Una solución de copiar y pegar para Redis con serialización json:
def get_celery_queue_items(queue_name):import base64
import json
# Get a configured instance of a celery app:from yourproject.celery import app as celery_app
with celery_app.pool.acquire(block=True)as conn:
tasks = conn.default_channel.client.lrange(queue_name,0,-1)
decoded_tasks =[]for task in tasks:
j = json.loads(task)
body = json.loads(base64.b64decode(j['body']))
decoded_tasks.append(body)return decoded_tasks
Funciona con Django. Simplemente no te olvides de cambiar yourproject.celery.
Si está utilizando el serializador de pickle, puede cambiar la body =línea a body = pickle.loads(base64.b64decode(j['body'])).
Jim Hunziker
4
El módulo de inspección de apio parece ser consciente de las tareas desde la perspectiva de los trabajadores. Si desea ver los mensajes que están en la cola (aún no han sido extraídos por los trabajadores), sugiero utilizar pyrabbit , que puede interactuar con la API http de rabbitmq para recuperar todo tipo de información de la cola.
Creo que la única forma de obtener las tareas que están esperando es mantener una lista de las tareas que comenzó y dejar que la tarea se elimine de la lista cuando se inicie.
@daveoncode No creo que sea suficiente información para responder de manera útil. Podrías abrir tu propia pregunta. No creo que sea un duplicado de este si especificas que deseas recuperar la información en Python. Volvería a stackoverflow.com/a/19465670/9843399 , que es en lo que basé mi respuesta, y me aseguraré de que funcione primero.
Caleb Syring
@CalebSyring Este es el primer enfoque que realmente me muestra las tareas en cola. Muy agradable. El único problema para mí es que la lista de agregar no parece funcionar. ¿Alguna idea de cómo puedo hacer que la función de devolución de llamada escriba en la lista?
Varlor
@Varlor Lo siento, alguien hizo una edición incorrecta de mi respuesta. Puede buscar en la historia de edición la respuesta original, que probablemente funcionará para usted. Estoy trabajando para arreglar esto. (EDITAR: acabo de
entrar y rechacé
@CalebSyring ¡Ahora usé tu código en una clase, tener la lista como un atributo de clase funciona!
Varlor
2
Hasta donde yo sé, Celery no da API para examinar las tareas que están esperando en la cola. Esto es específico del corredor. Si usa Redis como intermediario para un ejemplo, entonces examinar las tareas que están esperando en la celerycola (predeterminada) es tan simple como:
conectarse a la base de datos del corredor
elementos de la celerylista en la lista (comando LRANGE para un ejemplo)
Tenga en cuenta que estas son tareas EN ESPERA para ser elegidas por los trabajadores disponibles. Su clúster puede tener algunas tareas ejecutándose; esas no estarán en esta lista ya que ya se han seleccionado.
Llegué a la conclusión de que la mejor manera de obtener el número de trabajos en una cola es usar rabbitmqctlcomo se ha sugerido varias veces aquí. Para permitir que cualquier usuario elegido ejecute el comando sudo, seguí las instrucciones aquí (omití editar la parte del perfil, ya que no me importa escribir sudo antes del comando).
También agarré jamesc's grepy un cutfragmento y lo envolví en llamadas de subproceso.
from subprocess importPopen, PIPE
p1 =Popen(["sudo","rabbitmqctl","list_queues","-p","[name of your virtula host"], stdout=PIPE)
p2 =Popen(["grep","-e","^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 =Popen(["cut","-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()print("number of jobs on queue: %i"% int(p3.communicate()[0]))
from celery.task.control import inspect
def key_in_list(k, l):return bool([Truefor i in l if k in i.values()])def check_task(task_id):
task_value_dict = inspect().active().values()for task_list in task_value_dict:if self.key_in_list(task_id, task_list):returnTruereturnFalse
Si controla el código de las tareas, puede solucionar el problema permitiendo que una tarea desencadene un reintento trivial la primera vez que se ejecuta y luego verificando inspect().reserved(). El reintento registra la tarea con el resultado backend, y el apio puede ver eso. La tarea debe aceptar selfo contextcomo primer parámetro para que podamos acceder al recuento de reintentos.
Esta solución es independiente del agente, es decir. No tiene que preocuparse de si está utilizando RabbitMQ o Redis para almacenar las tareas.
EDITAR: después de probar, he encontrado que esto es solo una solución parcial. El tamaño de reservado está limitado a la configuración de captación previa para el trabajador.
Respuestas:
EDITAR: Vea otras respuestas para obtener una lista de tareas en la cola.
Usted debe mirar aquí: Guía de apio - Inspección de trabajadores
Básicamente esto:
Dependiendo de lo que quieras
fuente
i.reserved()
para obtener una lista de tareas en cola.inspect(['celery@Flatty'])
. Enorme mejora de velocidadinspect()
.si está usando rabbitMQ, use esto en la terminal:
imprimirá una lista de colas con varias tareas pendientes. por ejemplo:
el número en la columna derecha es el número de tareas en la cola. arriba, la cola de apio tiene 166 tareas pendientes.
fuente
grep -e "^celery\s" | cut -f2
para extraer eso166
si desea procesar ese número más tarde, por ejemplo, para las estadísticas.Si no usa tareas priorizadas, esto es bastante simple si está usando Redis. Para que la tarea cuente:
Pero, las tareas priorizadas usan una clave diferente en redis , por lo que la imagen completa es un poco más complicada. La imagen completa es que necesita consultar redis para cada prioridad de la tarea. En python (y del proyecto Flower), esto se ve así:
Si desea obtener una tarea real, puede usar algo como:
A partir de ahí, deberá deserializar la lista devuelta. En mi caso pude lograr esto con algo como:
Solo tenga en cuenta que la deserialización puede tomar un momento, y deberá ajustar los comandos anteriores para trabajar con varias prioridades.
fuente
DATABASE_NUMBER
usado por defecto es0
, y elQUEUE_NAME
escelery
, entoncesredis-cli -n 0 llen celery
devolverá el número de mensajes en cola.'{{{0}}}{1}{2}'
lugar de'{0}{1}{2}'
. Aparte de eso, ¡esto funciona perfectamente!Para recuperar tareas del backend, use esto
fuente
Si está utilizando Celery + Django, la forma más sencilla de inspeccionar tareas utilizando comandos directamente desde su terminal en su entorno virtual o utilizando una ruta completa al apio:
Doc : http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#inspecting-workers
Además, si está utilizando Celery + RabbitMQ , puede inspeccionar la lista de colas con el siguiente comando:
Más información : https://linux.die.net/man/1/rabbitmqctl
fuente
celery -A my_proj inspect reserved
Una solución de copiar y pegar para Redis con serialización json:
Funciona con Django. Simplemente no te olvides de cambiar
yourproject.celery
.fuente
body =
línea abody = pickle.loads(base64.b64decode(j['body']))
.El módulo de inspección de apio parece ser consciente de las tareas desde la perspectiva de los trabajadores. Si desea ver los mensajes que están en la cola (aún no han sido extraídos por los trabajadores), sugiero utilizar pyrabbit , que puede interactuar con la API http de rabbitmq para recuperar todo tipo de información de la cola.
Puede encontrar un ejemplo aquí: Recupere la longitud de la cola con Celery (RabbitMQ, Django)
fuente
Creo que la única forma de obtener las tareas que están esperando es mantener una lista de las tareas que comenzó y dejar que la tarea se elimine de la lista cuando se inicie.
Con rabbitmqctl y list_queues puede obtener una visión general de cuántas tareas están esperando, pero no las tareas en sí: http://www.rabbitmq.com/man/rabbitmqctl.1.man.html
Si lo que desea incluye la tarea que se está procesando, pero aún no ha terminado, puede mantener una lista de sus tareas y verificar sus estados:
O deja que Celery almacene los resultados con CELERY_RESULT_BACKEND y verifica cuáles de tus tareas no están allí.
fuente
Esto funcionó para mí en mi aplicación:
active_jobs
habrá una lista de cadenas que corresponden a tareas en la cola.No olvides cambiar CELERY_APP_INSTANCE por el tuyo.
Gracias a @ashish por señalarme en la dirección correcta con su respuesta aquí: https://stackoverflow.com/a/19465670/9843399
fuente
jobs
siempre es cero ... alguna idea?Hasta donde yo sé, Celery no da API para examinar las tareas que están esperando en la cola. Esto es específico del corredor. Si usa Redis como intermediario para un ejemplo, entonces examinar las tareas que están esperando en la
celery
cola (predeterminada) es tan simple como:celery
lista en la lista (comando LRANGE para un ejemplo)Tenga en cuenta que estas son tareas EN ESPERA para ser elegidas por los trabajadores disponibles. Su clúster puede tener algunas tareas ejecutándose; esas no estarán en esta lista ya que ya se han seleccionado.
fuente
Llegué a la conclusión de que la mejor manera de obtener el número de trabajos en una cola es usar
rabbitmqctl
como se ha sugerido varias veces aquí. Para permitir que cualquier usuario elegido ejecute el comandosudo
, seguí las instrucciones aquí (omití editar la parte del perfil, ya que no me importa escribir sudo antes del comando).También agarré jamesc's
grep
y uncut
fragmento y lo envolví en llamadas de subproceso.fuente
fuente
Si controla el código de las tareas, puede solucionar el problema permitiendo que una tarea desencadene un reintento trivial la primera vez que se ejecuta y luego verificando
inspect().reserved()
. El reintento registra la tarea con el resultado backend, y el apio puede ver eso. La tarea debe aceptarself
ocontext
como primer parámetro para que podamos acceder al recuento de reintentos.Esta solución es independiente del agente, es decir. No tiene que preocuparse de si está utilizando RabbitMQ o Redis para almacenar las tareas.
EDITAR: después de probar, he encontrado que esto es solo una solución parcial. El tamaño de reservado está limitado a la configuración de captación previa para el trabajador.
fuente
Con
subprocess.run
:Tenga cuidado de cambiar
my_proj
conyour_proj
fuente