¿Cómo verificar el estado de la tarea en apio?

92

¿Cómo se comprueba si una tarea se está ejecutando en apio (específicamente, estoy usando celery-django)?

Leí la documentación y busqué en Google, pero no puedo ver una llamada como:

my_example_task.state() == RUNNING

Mi caso de uso es que tengo un servicio externo (java) para la transcodificación. Cuando envío un documento para que sea transcodificado, quiero verificar si la tarea que ejecuta ese servicio se está ejecutando y, de no ser así, (re) iniciarla.

Estoy usando las versiones estables actuales - 2.4, creo.

Marcin
fuente

Respuestas:

97

Devuelve el task_id (que se proporciona desde .delay ()) y luego pregunta a la instancia de apio sobre el estado:

x = method.delay(1,2)
print x.task_id

Cuando pregunte, obtenga un nuevo AsyncResult usando este task_id:

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()
Gregor
fuente
10
Gracias, pero ¿y si no tengo acceso x?
Marcin
4
¿Dónde coloca sus trabajos en el apio? Allí debe devolver task_id para realizar un seguimiento del trabajo en el futuro.
Gregor
A diferencia de @ Marcin, esta respuesta no usa el método estático Task.AsyncResult () como la fábrica de AsyncResult, que reutiliza útilmente la configuración del backend; de lo contrario, se genera un error al intentar obtener el resultado.
ArnauOrriols
2
@Chris La controversia con el código @gregor está en la instanciación de async_result. En su caso de uso, ya tiene la instancia, está listo para comenzar. Pero, ¿qué sucede si solo tiene la identificación de la tarea y necesita crear una async_resultinstancia para poder llamar async_result.get()? Esta es una instancia de la AsyncResultclase, pero no puede usar la clase sin formato celery.result.AsyncResult, debe obtener la clase de la función envuelta por app.task(). En tu caso lo haríasasync_result = run_instance.AsyncResult('task-id')
ArnauOrriols
1
but you cannot use the raw class celery.result.AsyncResult, you need to get the class from the function wrapped by app.task(). - Creo que así es como se suponía que debía usarse. Lea el código: github.com/celery/celery/blob/…
nevelis
70

La creación de un AsyncResultobjeto a partir de la identificación de la tarea es la forma recomendada en las Preguntas frecuentes para obtener el estado de la tarea cuando lo único que tiene es la identificación de la tarea.

Sin embargo, a partir de Celery 3.x, existen advertencias importantes que podrían morder a las personas si no les prestan atención. Realmente depende del escenario de caso de uso específico.

De forma predeterminada, Apio no registra un estado "en ejecución".

Para que Celery registre que se está ejecutando una tarea, debe establecer task_track_starteden True. Aquí hay una tarea simple que prueba esto:

@app.task(bind=True)
def test(self):
    print self.AsyncResult(self.request.id).state

Cuando task_track_startedes False, que es el valor predeterminado, el estado muestra PENDINGaunque la tarea ha comenzado. Si lo establece task_track_starteden True, entonces el estado será STARTED.

El estado PENDINGsignifica "No sé".

Una AsyncResultcon el estado PENDINGno significa nada más que que Apio desconoce el estado de la tarea. Esto podría deberse a varias razones.

Por un lado, AsyncResultse puede construir con identificadores de tareas no válidos. Dichas "tareas" serán consideradas pendientes por Celery:

>>> task.AsyncResult("invalid").status
'PENDING'

De acuerdo, nadie va a proporcionar identificadores obviamente inválidos AsyncResult. Bastante justo, pero también tiene como efecto que AsyncResulttambién considerará una tarea que se ha ejecutado con éxito pero que Celery ha olvidado como PENDING. Nuevamente, en algunos escenarios de casos de uso esto puede ser un problema. Parte del problema depende de cómo se configura Celery para mantener los resultados de las tareas, porque depende de la disponibilidad de las "lápidas" en el backend de resultados. ("Tombstones" es el término que se usa en la documentación de Celery para los fragmentos de datos que registran cómo terminó la tarea). El uso AsyncResultno funcionará en absoluto si task_ignore_resultes así True. Un problema más irritante es que Celery expira las lápidas por defecto. losresult_expiresLa configuración predeterminada está establecida en 24 horas. Entonces, si inicia una tarea y registra la identificación en el almacenamiento a largo plazo, y más 24 horas después, crea una AsyncResultcon ella, el estado será PENDING.

Todas las "tareas reales" comienzan en el PENDINGestado. Por lo tanto, realizar PENDINGuna tarea podría significar que se solicitó la tarea, pero nunca avanzó más allá de esto (por cualquier motivo). O podría significar que la tarea se ejecutó pero Celery olvidó su estado.

¡Ay! AsyncResultno funcionará para mí. ¿Que más puedo hacer?

Prefiero realizar un seguimiento de los objetivos que realizar un seguimiento de las tareas en sí . Conservo cierta información sobre las tareas, pero en realidad es secundaria al seguimiento de los objetivos. Las porterías se almacenan en almacenamiento independiente del apio. Cuando una solicitud necesita realizar un cálculo depende de que se haya logrado algún objetivo, verifica si el objetivo ya se ha logrado, en caso afirmativo, utiliza este objetivo almacenado en caché, de lo contrario, inicia la tarea que afectará el objetivo y envía a el cliente que hizo la solicitud HTTP una respuesta que indica que debe esperar un resultado.


Los nombres de variables y los hipervínculos anteriores son para Celery 4.x. En 3.x las variables y los enlaces correspondientes son los siguientes: CELERY_TRACK_STARTED, CELERY_IGNORE_RESULT, CELERY_TASK_RESULT_EXPIRES.

Luis
fuente
Entonces, si quiero verificar el resultado más tarde (tal vez incluso dentro de otro proceso), ¿estoy mejor con mi propia implementación? ¿Almacenar el resultado en la base de datos manualmente?
Franklin Yu
Sí, separaría el seguimiento del "objetivo" del seguimiento de las "tareas". Escribí "realizar un cálculo que depende de algún objetivo". Por lo general, el "objetivo" también es un cálculo. Por ejemplo, si quiero mostrar el artículo X a un usuario, debo convertirlo de XML a HTML, pero antes de eso, debo haber resuelto todas las referencias bibliográficas. (X es como un artículo de revista). Verifico si existe el objetivo "artículo X con todas las referencias bibliográficas resueltas" y lo uso en lugar de intentar verificar el estado de una tarea de Apio que habría calculado el objetivo que deseo.
Louis
Y la información "artículo X con todas las referencias bibliográficas resueltas" se almacena en una memoria caché y se almacena en una base de datos eXist-db.
Louis
61

Cada Taskobjeto tiene una .requestpropiedad que lo contiene AsyncRequest. En consecuencia, la siguiente línea muestra el estado de una tarea task:

task.AsyncResult(task.request.id).state
Marcin
fuente
2
¿Existe alguna forma de almacenar el porcentaje de progreso de una tarea?
patrick
4
Cuando hago esto, obtengo un AsyncResult PENDIENTE permanentemente, incluso si espero lo suficiente para que termine la tarea. ¿Hay alguna forma de hacer que esto vea cambios de estado? Creo que mi backend está configurado e intenté configurar CELERY_TRACK_STARTED = True en vano.
dstromberg
1
@dstromberg Desafortunadamente, han pasado 4 años desde que esto fue un problema para mí, así que no puedo ayudar. Es casi seguro que necesites configurar el apio para rastrear el estado.
Marcin
16

También puede crear estados personalizados y actualizar su valor según la ejecución de la tarea. Este ejemplo es de docs:

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states

msangel
fuente
11

Antigua pregunta, pero recientemente me encontré con este problema.

Si está intentando obtener el task_id, puede hacerlo así:

import celery
from celery_app import add
from celery import uuid

task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)

Ahora sabe exactamente qué es task_id y ahora puede usarlo para obtener AsyncResult:

# grab the AsyncResult 
result = celery.result.AsyncResult(task_id)

# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf

# print the AsyncResult's status
print result.status
SUCCESS

# print the result returned 
print result.result
4
Cesar Rios
fuente
3
No hay absolutamente ninguna necesidad de crear su propia ID de tarea y pasarla a apply_async. El objeto devuelto por apply_async es un AsyncResultobjeto, que tiene el id de la tarea que generó Celery.
Louis
1
Corrígeme si me equivoco, pero ¿no es a veces útil generar un UUID basado en algunas entradas, de modo que todas las llamadas que reciben las mismas entradas obtienen el mismo UUID? IOW, tal vez a veces sea útil especificar su task_id.
dstromberg
1
@dstromberg La pregunta formulada por el OP es "cómo verifico el estado de la tarea" y la respuesta aquí dice "Si está tratando de obtener el task_id ...". Ni verificar el estado de la tarea, ni obtener, task_idrequiere que genere una identificación de tarea usted mismo. En su comentario, ha imaginado una razón que va más allá de "cómo verifico el estado de la tarea" y "Si está tratando de obtener el task_id ..." Genial si tiene esa necesidad, pero no es el caso aquí. (Además, usar uuid()para generar una identificación de tarea no hace absolutamente nada más allá de lo que hace Celery de forma predeterminada.)
Louis
Estoy de acuerdo en que el OP no preguntó específicamente cómo obtener ID de tareas predecibles, pero la respuesta a la pregunta del OP es actualmente "rastrear el ID de la tarea y hacer x". Me parece que rastrear el ID de la tarea no es práctico en una amplia variedad de situaciones, por lo que la respuesta puede no ser realmente satisfactoria. Esta respuesta me ayuda a resolver mi caso de uso (si puedo superar otras limitaciones señaladas) por la misma razón que señala @dstromberg, ya sea que esté motivado o no por esa razón.
Claytond
1

Respuesta de 2020:

#### tasks.py
@celery.task()
def mytask(arg1):
    print(arg1)

#### blueprint.py
@bp.route("/args/arg1=<arg1>")
def sleeper(arg1):
    process = mytask.apply_async(args=(arg1,)) #mytask.delay(arg1)
    state = process.state
    return f"Thanks for your patience, your job {process.task_id} \
             is being processed. Status {state}"
Adrián García Moreno
fuente
0

Tratar:

task.AsyncResult(task.request.id).state

esto proporcionará el estado de la tarea de apio. Si la tarea de apio ya está en estado FALLO , arrojará una excepción:

raised unexpected: KeyError('exc_type',)

gogasca
fuente
0

Encontré información útil en el

Guía de trabajadores del proyecto de apio inspectores de trabajadores

En mi caso, estoy comprobando si Celery se está ejecutando.

inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
    state = 'FAILURE'
else:
    state = str(task.state) 

Puede jugar con inspeccionar para satisfacer sus necesidades.

zerocog
fuente
0
  • Primero, en su APLICACIÓN de apio:

vi my_celery_apps / app1.py

app = Celery(worker_name)
  • y luego, cambie al archivo de tareas, importe la aplicación desde su módulo de aplicación de apio.

vi tasks / task1.py

from my_celery_apps.app1 import app

app.AsyncResult(taskid)

try:
   if task.state.lower() != "success":
        return
except:
    """ do something """

Usted ZhengChuan
fuente
-1

Aparte del enfoque programático anterior, se puede ver fácilmente el estado de la tarea de flor.

Monitoreo en tiempo real usando Celery Events. Flower es una herramienta basada en web para monitorear y administrar clústeres de apio.

  1. Progreso e historial de tareas
  2. Capacidad para mostrar detalles de la tarea (argumentos, hora de inicio, tiempo de ejecución y más)
  3. Gráficos y estadísticas

Documento oficial: Flor - Herramienta de seguimiento del apio

Instalación:

$ pip install flower

Uso:

http://localhost:5555
Roshan Bagdiya
fuente
-1
res = method.delay()
    
print(f"id={res.id}, state={res.state}, status={res.status} ")

print(res.get())
Saurabh I
fuente
2
No publique solo el código como respuesta, sino que también proporcione una explicación de lo que hace su código y cómo resuelve el problema de la pregunta. Las respuestas con una explicación suelen ser más útiles y de mejor calidad, y es más probable que atraigan votos positivos.
Mark Rotteveel