Estoy tratando de encontrar una metodología de prueba para nuestro proyecto django-celery . He leído las notas en la documentación , pero no me dio una buena idea de qué hacer realmente. No me preocupa probar las tareas en los demonios reales, solo la funcionalidad de mi código. Principalmente me pregunto:
- ¿Cómo podemos omitir
task.delay()durante la prueba (intenté configurarCELERY_ALWAYS_EAGER = Truepero no hizo ninguna diferencia)? - ¿Cómo usamos las configuraciones de prueba recomendadas (si esa es la mejor manera) sin cambiar realmente nuestro settings.py?
- ¿Podemos seguir usando
manage.py testo tenemos que usar un corredor personalizado?
En general, cualquier sugerencia o consejo para probar con apio sería muy útil.
python
django
unit-testing
celery
Jason Webb
fuente
fuente

CELERY_ALWAYS_EAGERque no hay diferencia?.delaypodría estar intentando establecer una conexión.BROKER_BACKEND=memorypodría ayudar en ese caso.BROKER_BACKEND=memoryarreglado. Si pones eso como respuesta, lo marcaré como correcto.Respuestas:
Intente configurar:
BROKER_BACKEND = 'memory'(Gracias al comentario de askol ).
fuente
Me gusta usar el decorador override_settings en pruebas que necesitan resultados de apio para completarse.
from django.test import TestCase from django.test.utils import override_settings from myapp.tasks import mytask class AddTestCase(TestCase): @override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True, CELERY_ALWAYS_EAGER=True, BROKER_BACKEND='memory') def test_mytask(self): result = mytask.delay() self.assertTrue(result.successful())Si desea aplicar esto a todas las pruebas, puede usar el corredor de prueba de apio como se describe en http://docs.celeryproject.org/en/2.5/django/unit-testing.html que básicamente establece estas mismas configuraciones excepto (
BROKER_BACKEND = 'memory').En la configuración de:
TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'Mire la fuente de CeleryTestSuiteRunner y está bastante claro lo que está sucediendo.
fuente
djcelery.Aquí hay un extracto de mi clase base de prueba que elimina el
apply_asyncmétodo y registra las llamadas a él (lo que incluyeTask.delay). Es un poco asqueroso, pero se las arregló para satisfacer mis necesidades durante los últimos meses que lo he estado usando.from django.test import TestCase from celery.task.base import Task # For recent versions, Task has been moved to celery.task.app: # from celery.app.task import Task # See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html class CeleryTestCaseBase(TestCase): def setUp(self): super(CeleryTestCaseBase, self).setUp() self.applied_tasks = [] self.task_apply_async_orig = Task.apply_async @classmethod def new_apply_async(task_class, args=None, kwargs=None, **options): self.handle_apply_async(task_class, args, kwargs, **options) # monkey patch the regular apply_sync with our method Task.apply_async = new_apply_async def tearDown(self): super(CeleryTestCaseBase, self).tearDown() # Reset the monkey patch to the original method Task.apply_async = self.task_apply_async_orig def handle_apply_async(self, task_class, args=None, kwargs=None, **options): self.applied_tasks.append((task_class, tuple(args), kwargs)) def assert_task_sent(self, task_class, *args, **kwargs): was_sent = any(task_class == task[0] and args == task[1] and kwargs == task[2] for task in self.applied_tasks) self.assertTrue(was_sent, 'Task not called w/class %s and args %s' % (task_class, args)) def assert_task_not_sent(self, task_class): was_sent = any(task_class == task[0] for task in self.applied_tasks) self.assertFalse(was_sent, 'Task was not expected to be called, but was. Applied tasks: %s' % self.applied_tasks)Aquí hay un ejemplo "inicial" de cómo lo usaría en sus casos de prueba:
mymodule.pyfrom my_tasks import SomeTask def run_some_task(should_run): if should_run: SomeTask.delay(1, some_kwarg=2)test_mymodule.pyclass RunSomeTaskTest(CeleryTestCaseBase): def test_should_run(self): run_some_task(should_run=True) self.assert_task_sent(SomeTask, 1, some_kwarg=2) def test_should_not_run(self): run_some_task(should_run=False) self.assert_task_not_sent(SomeTask)fuente
como todavía veo esto en los resultados de búsqueda, la configuración se anula con
TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'funcionó para mí según Celery Docs
fuente
Esto es lo que hice
Dentro de myapp.tasks.py tengo:
from celery import shared_task @shared_task() def add(a, b): return a + bDentro de myapp.test_tasks.py tengo:
from django.test import TestCase, override_settings from myapp.tasks import add class TasksTestCase(TestCase): def setUp(self): ... @override_settings(CELERY_TASK_ALWAYS_EAGER=True,CELERY_TASK_EAGER_PROPOGATES=True) def test_create_sections(self): result= add.delay(1,2) assert result.successful() == True assert result.get() == 3fuente
Para todos los que lleguen aquí en 2019: consulte este artículo que cubre diferentes estrategias, incluida la llamada a tareas sincrónicamente.
fuente