¿Cómo se prueba unitariamente una tarea de apio?

Respuestas:

61

Es posible probar tareas sincrónicamente usando cualquier lib unittest disponible. Normalmente hago 2 sesiones de prueba diferentes cuando trabajo con tareas de apio. El primero (como sugiero a continuación) es completamente sincrónico y debería ser el que se asegure de que el algoritmo haga lo que debería hacer. La segunda sesión usa todo el sistema (incluido el intermediario) y se asegura de que no tenga problemas de serialización o cualquier otro problema de distribución o comunicación.

Entonces:

from celery import Celery

celery = Celery()

@celery.task
def add(x, y):
    return x + y

Y tu prueba:

from nose.tools import eq_

def test_add_task():
    rst = add.apply(args=(4, 4)).get()
    eq_(rst, 8)

¡Espero que ayude!

FlaPer87
fuente
1
Eso funciona excepto en tareas que usan una HttpDispatchTask - docs.celeryproject.org/en/latest/userguide/remote-tasks.html donde tengo que configurar celery.conf.CELERY_ALWAYS_EAGER = True pero incluso con la configuración también celery.conf.CELERY_IMPORTS = ('celery.task.http') la prueba falla con NotRegistered: celery.task.http.HttpDispatchTask
davidmytton
Extraño, ¿estás seguro de que no tienes problemas de importación? Esta prueba funciona (tenga en cuenta que estoy fingiendo la respuesta para que devuelva lo que espera el apio). Además, los módulos definidos en CELERY_IMPORTS se importarán durante la inicialización de los trabajadores , para evitar esto te sugiero que llames celery.loader.import_default_modules().
FlaPer87
También te sugiero que eches un vistazo aquí . Se burla de la solicitud http. No sé si ayuda, supongo que desea probar un servicio que está en funcionamiento, ¿no es así?
FlaPer87
52

Yo uso esto:

with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
    ...

Documentos: http://docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager

CELERY_ALWAYS_EAGER le permite ejecutar su tarea sincrónicamente y no necesita un servidor de apio.

guettli
fuente
1
Creo que esto está desactualizado, lo entiendo ImportError: No module named celeryconfig.
Daenyth
7
Creo que lo anterior asume que el módulo celeryconfig.pyexiste en el paquete de uno. Consulte docs.celeryproject.org/en/latest/getting-started/… .
Kamil Sindi
1
Sé que es antiguo, pero ¿puede proporcionar un ejemplo completo de cómo iniciar tareas adddesde la pregunta de OP dentro de una TestCaseclase?
Kruupös
@ MaxChrétien lo siento, no puedo dar un ejemplo completo, ya que ya no uso apio. Puede editar mi pregunta, si tiene suficientes puntos de reputación. Si no tiene suficiente, avíseme qué debo copiar y pegar en esta respuesta.
guettli
1
@ miken32 gracias. Como la respuesta más reciente aborda de alguna manera el problema con el que quería ayudar, acabo de dejar un comentario de que los documentos oficiales para 4.0 desalientan el uso de CELERY_TASK_ALWAYS_EAGERpruebas unitarias.
krassowski
33

Depende de lo que quieras probar exactamente.

  • Pruebe el código de la tarea directamente. No llame a "task.delay (...)" simplemente llame a "task (...)" de sus pruebas unitarias.
  • Utilice CELERY_ALWAYS_EAGER . Esto hará que sus tareas sean llamadas inmediatamente en el punto en el que diga "task.delay (...)", para que pueda probar la ruta completa (pero no ningún comportamiento asincrónico).
Slacy
fuente
24

prueba de unidad

import unittest

from myproject.myapp import celeryapp

class TestMyCeleryWorker(unittest.TestCase):

  def setUp(self):
      celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)

accesorios de py.test

# conftest.py
from myproject.myapp import celeryapp

@pytest.fixture(scope='module')
def celery_app(request):
    celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
    return celeryapp

# test_tasks.py
def test_some_task(celery_app):
    ...

Anexo: haga que send_task respete ansioso

from celery import current_app

def send_task(name, args=(), kwargs={}, **opts):
    # https://github.com/celery/celery/issues/581
    task = current_app.tasks[name]
    return task.apply(args, kwargs, **opts)

current_app.send_task = send_task
Kamil Sindi
fuente
22

Para aquellos en Apio 4 es:

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)

Debido a que los nombres de las configuraciones se han cambiado y deben actualizarse si decide actualizar, consulte

https://docs.celeryproject.org/en/latest/history/whatsnew-4.0.html?highlight=what%20is%20new#lowercase-setting-names

okrutny
fuente
11
Según los documentos oficiales , el uso de "task_always_eager" (anteriormente "CELERY_ALWAYS_EAGER") no es adecuado para las pruebas unitarias. En su lugar, proponen otras formas excelentes de realizar pruebas unitarias de su aplicación Apio.
krassowski
4
Solo agregaré que la razón por la que no desea tareas ansiosas en sus pruebas unitarias es porque entonces no está probando, por ejemplo, la serialización de parámetros que sucederá una vez que esté usando el código en producción.
Maldito
15

A partir de apio 3.0 , una forma de configurar CELERY_ALWAYS_EAGERen Django es:

from django.test import TestCase, override_settings

from .foo import foo_celery_task

class MyTest(TestCase):

    @override_settings(CELERY_ALWAYS_EAGER=True)
    def test_foo(self):
        self.assertTrue(foo_celery_task.delay())
Aaron Lelevier
fuente
7

Desde Celery v4.0 , se proporcionan accesorios de py.test para iniciar un trabajador de apio solo para la prueba y se apagan cuando termina:

def test_myfunc_is_executed(celery_session_worker):
    # celery_session_worker: <Worker: [email protected] (running)>
    assert myfunc.delay().wait(3)

Entre otros accesorios descritos en http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test , puede cambiar las opciones predeterminadas de apio redefiniendo el celery_configaccesorio de esta manera:

@pytest.fixture(scope='session')
def celery_config():
    return {
        'accept_content': ['json', 'pickle'],
        'result_serializer': 'pickle',
    }

De forma predeterminada, el trabajador de pruebas usa un intermediario en memoria y un backend de resultados. No es necesario utilizar un Redis o RabbitMQ local si no se están probando funciones específicas.

Alanjds
fuente
Estimado votante negativo, ¿le gustaría compartir por qué esta es una mala respuesta? Sinceramente gracias.
Alanjds
2
No funcionó para mí, la suite de pruebas simplemente se cuelga. ¿Podrías proporcionar más contexto? (Aunque todavía no he votado;)).
dualidad_
En mi caso, tuve que configurar explícitamente el dispositivo celey_config para usar el intermediario de memoria y el backend de caché + memoria
sanzoghenzo
5

referencia usando pytest.

def test_add(celery_worker):
    mytask.delay()

si usa un matraz, configure la configuración de la aplicación

    CELERY_BROKER_URL = 'memory://'
    CELERY_RESULT_BACKEND = 'cache+memory://'

y en conftest.py

@pytest.fixture
def app():
    yield app   # Your actual Flask application

@pytest.fixture
def celery_app(app):
    from celery.contrib.testing import tasks   # need it
    yield celery_app    # Your actual Flask-Celery application
Yoge
fuente
2

En mi caso (y supongo que en muchos otros), todo lo que quería era probar la lógica interna de una tarea usando pytest.

TL; DR; Terminó burlándose de todo ( OPCIÓN 2 )


Ejemplo de caso de uso :

proj/tasks.py

@shared_task(bind=True)
def add_task(self, a, b):
    return a+b;

tests/test_tasks.py

from proj import add_task

def test_add():
    assert add_task(1, 2) == 3, '1 + 2 should equal 3'

pero desde shared_task decorador hace mucha lógica interna de apio, no es realmente una prueba unitaria.

Entonces, para mí, había 2 opciones:

OPCIÓN 1: Lógica interna separada

proj/tasks_logic.py

def internal_add(a, b):
    return a + b;

proj/tasks.py

from .tasks_logic import internal_add

@shared_task(bind=True)
def add_task(self, a, b):
    return internal_add(a, b);

Esto parece muy extraño y, además de hacerlo menos legible, requiere extraer y pasar manualmente los atributos que forman parte de la solicitud, por ejemplo, task_id en caso de que lo necesite, lo que hace que la lógica sea menos pura.

OPCIÓN 2:
burlarse de las partes internas del apio

tests/__init__.py

# noinspection PyUnresolvedReferences
from celery import shared_task

from mock import patch


def mock_signature(**kwargs):
    return {}


def mocked_shared_task(*decorator_args, **decorator_kwargs):
    def mocked_shared_decorator(func):
        func.signature = func.si = func.s = mock_signature
        return func

    return mocked_shared_decorator

patch('celery.shared_task', mocked_shared_task).start()

que luego me permite simular el objeto de la solicitud (nuevamente, en caso de que necesite cosas de la solicitud, como la identificación o el contador de reintentos.

tests/test_tasks.py

from proj import add_task

class MockedRequest:
    def __init__(self, id=None):
        self.id = id or 1


class MockedTask:
    def __init__(self, id=None):
        self.request = MockedRequest(id=id)


def test_add():
    mocked_task = MockedTask(id=3)
    assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3'

Esta solución es mucho más manual, pero me da el control que necesito para realizar una prueba unitaria , sin repetirme y sin perder el alcance del apio.

Daniel Dubovski
fuente