Estoy tratando de encontrar una metodología de prueba para nuestro proyecto django-celery . He leído las notas en la documentación , pero no me dio una buena idea de qué hacer realmente. No me preocupa probar las tareas en los demonios reales, solo la funcionalidad de mi código. Principalmente me pregunto:
- ¿Cómo podemos omitir
task.delay()
durante la prueba (intenté configurarCELERY_ALWAYS_EAGER = True
pero no hizo ninguna diferencia)? - ¿Cómo usamos las configuraciones de prueba recomendadas (si esa es la mejor manera) sin cambiar realmente nuestro settings.py?
- ¿Podemos seguir usando
manage.py test
o tenemos que usar un corredor personalizado?
En general, cualquier sugerencia o consejo para probar con apio sería muy útil.
python
django
unit-testing
celery
Jason Webb
fuente
fuente
CELERY_ALWAYS_EAGER
que no hay diferencia?.delay
podría estar intentando establecer una conexión.BROKER_BACKEND=memory
podría ayudar en ese caso.BROKER_BACKEND=memory
arreglado. Si pones eso como respuesta, lo marcaré como correcto.Respuestas:
Intente configurar:
BROKER_BACKEND = 'memory'
(Gracias al comentario de askol ).
fuente
Me gusta usar el decorador override_settings en pruebas que necesitan resultados de apio para completarse.
from django.test import TestCase from django.test.utils import override_settings from myapp.tasks import mytask class AddTestCase(TestCase): @override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True, CELERY_ALWAYS_EAGER=True, BROKER_BACKEND='memory') def test_mytask(self): result = mytask.delay() self.assertTrue(result.successful())
Si desea aplicar esto a todas las pruebas, puede usar el corredor de prueba de apio como se describe en http://docs.celeryproject.org/en/2.5/django/unit-testing.html que básicamente establece estas mismas configuraciones excepto (
BROKER_BACKEND = 'memory'
).En la configuración de:
TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'
Mire la fuente de CeleryTestSuiteRunner y está bastante claro lo que está sucediendo.
fuente
djcelery
.Aquí hay un extracto de mi clase base de prueba que elimina el
apply_async
método y registra las llamadas a él (lo que incluyeTask.delay
). Es un poco asqueroso, pero se las arregló para satisfacer mis necesidades durante los últimos meses que lo he estado usando.from django.test import TestCase from celery.task.base import Task # For recent versions, Task has been moved to celery.task.app: # from celery.app.task import Task # See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html class CeleryTestCaseBase(TestCase): def setUp(self): super(CeleryTestCaseBase, self).setUp() self.applied_tasks = [] self.task_apply_async_orig = Task.apply_async @classmethod def new_apply_async(task_class, args=None, kwargs=None, **options): self.handle_apply_async(task_class, args, kwargs, **options) # monkey patch the regular apply_sync with our method Task.apply_async = new_apply_async def tearDown(self): super(CeleryTestCaseBase, self).tearDown() # Reset the monkey patch to the original method Task.apply_async = self.task_apply_async_orig def handle_apply_async(self, task_class, args=None, kwargs=None, **options): self.applied_tasks.append((task_class, tuple(args), kwargs)) def assert_task_sent(self, task_class, *args, **kwargs): was_sent = any(task_class == task[0] and args == task[1] and kwargs == task[2] for task in self.applied_tasks) self.assertTrue(was_sent, 'Task not called w/class %s and args %s' % (task_class, args)) def assert_task_not_sent(self, task_class): was_sent = any(task_class == task[0] for task in self.applied_tasks) self.assertFalse(was_sent, 'Task was not expected to be called, but was. Applied tasks: %s' % self.applied_tasks)
Aquí hay un ejemplo "inicial" de cómo lo usaría en sus casos de prueba:
mymodule.py
from my_tasks import SomeTask def run_some_task(should_run): if should_run: SomeTask.delay(1, some_kwarg=2)
test_mymodule.py
class RunSomeTaskTest(CeleryTestCaseBase): def test_should_run(self): run_some_task(should_run=True) self.assert_task_sent(SomeTask, 1, some_kwarg=2) def test_should_not_run(self): run_some_task(should_run=False) self.assert_task_not_sent(SomeTask)
fuente
como todavía veo esto en los resultados de búsqueda, la configuración se anula con
TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'
funcionó para mí según Celery Docs
fuente
Esto es lo que hice
Dentro de myapp.tasks.py tengo:
from celery import shared_task @shared_task() def add(a, b): return a + b
Dentro de myapp.test_tasks.py tengo:
from django.test import TestCase, override_settings from myapp.tasks import add class TasksTestCase(TestCase): def setUp(self): ... @override_settings(CELERY_TASK_ALWAYS_EAGER=True,CELERY_TASK_EAGER_PROPOGATES=True) def test_create_sections(self): result= add.delay(1,2) assert result.successful() == True assert result.get() == 3
fuente
Para todos los que lleguen aquí en 2019: consulte este artículo que cubre diferentes estrategias, incluida la llamada a tareas sincrónicamente.
fuente