¿Pruebas unitarias con django-celery?

82

Estoy tratando de encontrar una metodología de prueba para nuestro proyecto django-celery . He leído las notas en la documentación , pero no me dio una buena idea de qué hacer realmente. No me preocupa probar las tareas en los demonios reales, solo la funcionalidad de mi código. Principalmente me pregunto:

  1. ¿Cómo podemos omitir task.delay()durante la prueba (intenté configurar CELERY_ALWAYS_EAGER = Truepero no hizo ninguna diferencia)?
  2. ¿Cómo usamos las configuraciones de prueba recomendadas (si esa es la mejor manera) sin cambiar realmente nuestro settings.py?
  3. ¿Podemos seguir usando manage.py testo tenemos que usar un corredor personalizado?

En general, cualquier sugerencia o consejo para probar con apio sería muy útil.

Jason Webb
fuente
1
¿Qué quieres decir con CELERY_ALWAYS_EAGERque no hay diferencia?
askol
Sigo recibiendo errores sobre no poder contactar a rabbitmq.
Jason Webb
¿Tiene el rastreo? Supongo que algo más que .delaypodría estar intentando establecer una conexión.
askol
11
El entorno BROKER_BACKEND=memorypodría ayudar en ese caso.
askol
Pregúntale que tenías razón. BROKER_BACKEND=memoryarreglado. Si pones eso como respuesta, lo marcaré como correcto.
Jason Webb

Respuestas:

43

Intente configurar:

BROKER_BACKEND = 'memory'

(Gracias al comentario de askol ).

un nerd pagado
fuente
8
Creo que esto ya no es necesario cuando CELERY_ALWAYS_EAGER está configurado.
mlissner
3
¿Encontraste una solución para el apio 4?
David Schumann
72

Me gusta usar el decorador override_settings en pruebas que necesitan resultados de apio para completarse.

from django.test import TestCase
from django.test.utils import override_settings
from myapp.tasks import mytask

class AddTestCase(TestCase):

    @override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
                       CELERY_ALWAYS_EAGER=True,
                       BROKER_BACKEND='memory')
    def test_mytask(self):
        result = mytask.delay()
        self.assertTrue(result.successful())

Si desea aplicar esto a todas las pruebas, puede usar el corredor de prueba de apio como se describe en http://docs.celeryproject.org/en/2.5/django/unit-testing.html que básicamente establece estas mismas configuraciones excepto ( BROKER_BACKEND = 'memory').

En la configuración de:

TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'

Mire la fuente de CeleryTestSuiteRunner y está bastante claro lo que está sucediendo.

Joshua
fuente
1
Esto no funcionó con apio 4, incluso con los
cambios
Funciona en apio 3.1. Acabo de obtener mis casos de prueba de Apio heredados de una clase principal con este decorador. De esa manera, solo se necesita en un lugar y no hay necesidad de detenerse djcelery.
kontextify
1
Esto funciona muy bien en Apio 4.4. y Django 2.2. El mejor enfoque para ejecutar pruebas unitarias, que encontré hasta ahora.
Erik Kalkoken
18

Aquí hay un extracto de mi clase base de prueba que elimina el apply_asyncmétodo y registra las llamadas a él (lo que incluye Task.delay). Es un poco asqueroso, pero se las arregló para satisfacer mis necesidades durante los últimos meses que lo he estado usando.

from django.test import TestCase
from celery.task.base import Task
# For recent versions, Task has been moved to celery.task.app:
# from celery.app.task import Task
# See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html

class CeleryTestCaseBase(TestCase):

    def setUp(self):
        super(CeleryTestCaseBase, self).setUp()
        self.applied_tasks = []

        self.task_apply_async_orig = Task.apply_async

        @classmethod
        def new_apply_async(task_class, args=None, kwargs=None, **options):
            self.handle_apply_async(task_class, args, kwargs, **options)

        # monkey patch the regular apply_sync with our method
        Task.apply_async = new_apply_async

    def tearDown(self):
        super(CeleryTestCaseBase, self).tearDown()

        # Reset the monkey patch to the original method
        Task.apply_async = self.task_apply_async_orig

    def handle_apply_async(self, task_class, args=None, kwargs=None, **options):
        self.applied_tasks.append((task_class, tuple(args), kwargs))

    def assert_task_sent(self, task_class, *args, **kwargs):
        was_sent = any(task_class == task[0] and args == task[1] and kwargs == task[2]
                       for task in self.applied_tasks)
        self.assertTrue(was_sent, 'Task not called w/class %s and args %s' % (task_class, args))

    def assert_task_not_sent(self, task_class):
        was_sent = any(task_class == task[0] for task in self.applied_tasks)
        self.assertFalse(was_sent, 'Task was not expected to be called, but was.  Applied tasks: %s' %                 self.applied_tasks)

Aquí hay un ejemplo "inicial" de cómo lo usaría en sus casos de prueba:

mymodule.py

from my_tasks import SomeTask

def run_some_task(should_run):
    if should_run:
        SomeTask.delay(1, some_kwarg=2)

test_mymodule.py

class RunSomeTaskTest(CeleryTestCaseBase):
    def test_should_run(self):
        run_some_task(should_run=True)
        self.assert_task_sent(SomeTask, 1, some_kwarg=2)

    def test_should_not_run(self):
        run_some_task(should_run=False)
        self.assert_task_not_sent(SomeTask)
Sam Dolan
fuente
4

como todavía veo esto en los resultados de búsqueda, la configuración se anula con

TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'

funcionó para mí según Celery Docs

dorkforce
fuente
1

Esto es lo que hice

Dentro de myapp.tasks.py tengo:

from celery import shared_task

@shared_task()
def add(a, b):
    return a + b

Dentro de myapp.test_tasks.py tengo:

from django.test import TestCase, override_settings
from myapp.tasks import add


class TasksTestCase(TestCase):

    def setUp(self):
        ...

    @override_settings(CELERY_TASK_ALWAYS_EAGER=True,CELERY_TASK_EAGER_PROPOGATES=True)
    def test_create_sections(self):
        result= add.delay(1,2)
        assert result.successful() == True
        assert result.get() == 3
Marco Frattallone
fuente
0

Para todos los que lleguen aquí en 2019: consulte este artículo que cubre diferentes estrategias, incluida la llamada a tareas sincrónicamente.

Sibirsky
fuente