Tengo una tarea de apio intensiva en la CPU. Me gustaría usar toda la potencia de procesamiento (núcleos) en muchas instancias EC2 para hacer este trabajo más rápido (una tarea distribuida en paralelo de apio con multiprocesamiento, creo ) .
Los términos, subprocesamiento , multiprocesamiento , computación distribuida , procesamiento paralelo distribuido son todos términos que estoy tratando de entender mejor.
Tarea de ejemplo:
@app.task
for item in list_of_millions_of_ids:
id = item # do some long complicated equation here very CPU heavy!!!!!!!
database.objects(newid=id).save()
Usando el código anterior (con un ejemplo si es posible), ¿cómo se distribuiría esta tarea usando Celery al permitir que esta tarea se divida utilizando toda la potencia de la CPU informática en todas las máquinas disponibles en la nube?
python
django
multithreading
multiprocessing
celery
Prometeo
fuente
fuente
Respuestas:
Tus metas son:
El apio puede hacer ambas cosas con bastante facilidad. Lo primero que hay que entender es que cada trabajador de apio está configurado de forma predeterminada para ejecutar tantas tareas como núcleos de CPU haya disponibles en un sistema:
Esto significa que cada tarea individual no necesita preocuparse por el uso de multiprocesamiento / subprocesamiento para hacer uso de múltiples CPU / núcleos. En cambio, el apio ejecutará suficientes tareas al mismo tiempo para usar cada CPU disponible.
Con eso fuera del camino, el siguiente paso es crear una tarea que maneje el procesamiento de algún subconjunto de su
list_of_millions_of_ids
. Aquí tiene un par de opciones: una es hacer que cada tarea maneje una única ID, por lo que ejecuta N tareas, dondeN == len(list_of_millions_of_ids)
. Esto garantizará que el trabajo se distribuya uniformemente entre todas sus tareas, ya que nunca habrá un caso en el que un trabajador termine antes de tiempo y simplemente esté esperando; si necesita trabajo, puede sacar una identificación de la cola. Puede hacer esto (como lo menciona John Doe) usando un apiogroup
.tasks.py:
@app.task def process_id(item): id = item #long complicated equation here database.objects(newid=id).save()
Y para ejecutar las tareas:
from celery import group from tasks import process_id jobs = group(process_id.s(item) for item in list_of_millions_of_ids) result = jobs.apply_async()
Otra opción es dividir la lista en partes más pequeñas y distribuirlas a sus trabajadores. Este enfoque corre el riesgo de desperdiciar algunos ciclos, porque puede terminar con algunos trabajadores esperando mientras otros todavía están trabajando. Sin embargo, la documentación del apio señala que esta preocupación a menudo es infundada:
Por lo tanto, puede encontrar que fragmentar la lista y distribuir los fragmentos a cada tarea funciona mejor, debido a la reducción de la sobrecarga de mensajería. Probablemente también pueda aligerar un poco la carga en la base de datos de esta manera, calculando cada identificación, almacenándola en una lista y luego agregando la lista completa en la base de datos una vez que haya terminado, en lugar de hacerlo una identificación a la vez . El enfoque de fragmentación se vería así
tasks.py:
@app.task def process_ids(items): for item in items: id = item #long complicated equation here database.objects(newid=id).save() # Still adding one id at a time, but you don't have to.
Y para empezar las tareas:
from tasks import process_ids jobs = process_ids.chunks(list_of_millions_of_ids, 30) # break the list into 30 chunks. Experiment with what number works best here. jobs.apply_async()
Puede experimentar un poco con qué tamaño de fragmento le da el mejor resultado. Desea encontrar un punto óptimo en el que reduzca la sobrecarga de mensajería y, al mismo tiempo, mantenga el tamaño lo suficientemente pequeño como para no terminar con los trabajadores terminando su parte mucho más rápido que otro trabajador y luego simplemente esperando sin nada que hacer.
fuente
threading
omultiprocessing
. En lugar de hacer eso, hacemos que cada trabajador del apio genere tantas tareas como núcleos haya disponibles en la máquina (esto sucede de forma predeterminada en el apio). Eso significa que en todo su clúster, cada núcleo se puede utilizar para procesar sulist_of_million_ids
, haciendo que cada tarea utilice un solo núcleo. Entonces, en lugar de tener una sola tarea que use muchos núcleos, tenemos muchas tareas, cada una de las cuales usa un núcleo. ¿Tiene sentido?threading
omultiprocessing
". Suponiendo que no podamos dividir esa tarea pesada en varias, ¿cómo usaría el subproceso o el multiprocesamiento para que el apio divida la tarea entre varias instancias? graciasmultiprocessing
para dividir el trabajo desde el interior de la tarea, ya que ambos enfoques requieren en última instancia hacer el Lo mismo: dividir una tarea en tareas más pequeñas que se pueden ejecutar en paralelo. Realmente solo está cambiando el punto en el que está dividiendo.multiprocessing
dentro de una tarea de Apio. El propio apio usabilliard
(unmultiprocessing
tenedor) para ejecutar sus tareas en procesos separados. Simplemente no está permitido usarlosmultiprocessing
dentro de ellos.En el mundo de la distribución solo hay una cosa que debes recordar sobre todo:
Sé que suena evidente, pero antes de distribuir la verificación doble, está utilizando el mejor algoritmo (si existe ...). Dicho esto, optimizar la distribución es un acto de equilibrio entre 3 cosas:
Las computadoras están hechas para que cuanto más se acerque a su unidad de procesamiento (3), más rápido y más eficiente (1) y (2) serán. El orden en un clúster clásico será: disco duro de red, disco duro local, RAM, dentro del territorio de la unidad de procesamiento ... Hoy en día, los procesadores se están volviendo lo suficientemente sofisticados como para ser considerados como un conjunto de unidades de procesamiento de hardware independientes comúnmente llamadas núcleos, estos núcleos procesan datos (3) a través de hilos (2). Imagine que su núcleo es tan rápido que cuando envía datos con un hilo está utilizando el 50% de la potencia de la computadora, si el núcleo tiene 2 hilos, entonces utilizará el 100%. Dos subprocesos por núcleo se denominan hiperprocesos, y su sistema operativo verá 2 CPU por núcleo hiperprocesado.
La administración de subprocesos en un procesador se denomina comúnmente multiproceso. La administración de CPU desde el sistema operativo se denomina comúnmente multiprocesamiento. La gestión de tareas simultáneas en un clúster se denomina comúnmente programación paralela. La gestión de tareas dependientes en un clúster se denomina comúnmente programación distribuida.
Entonces, ¿dónde está tu cuello de botella?
¿Y el apio?
Apio es un marco de mensajería para programación distribuida, que utilizará un módulo de intermediario para la comunicación (2) y un módulo de backend para la persistencia (1), esto significa que podrá cambiar la configuración para evitar la mayoría de los cuellos de botella (si es posible) en su red y solo en su red. Primero, perfile tu código para lograr el mejor rendimiento en una sola computadora. Luego use apio en su racimo con la configuración predeterminada y establezca
CELERY_RESULT_PERSISTENT=True
:from celery import Celery app = Celery('tasks', broker='amqp://guest@localhost//', backend='redis://localhost') @app.task def process_id(all_the_data_parameters_needed_to_process_in_this_computer): #code that does stuff return result
Durante la ejecución, abra sus herramientas de monitoreo favoritas, yo uso el predeterminado para rabbitMQ y flower para apio y top para cpus, sus resultados se guardarán en su backend. Un ejemplo de cuello de botella en la red es la cola de tareas que crece tanto que retrasan la ejecución, puede proceder a cambiar los módulos o la configuración de apio, si no su cuello de botella está en otro lugar.
fuente
Por qué no usar
group
tarea de apio para esto?http://celery.readthedocs.org/en/latest/userguide/canvas.html#groups
Básicamente, debes dividirlos
ids
en trozos (o rangos) y asignarlos a un montón de tareas engroup
.Para algo más sofisticado, como agregar resultados de tareas particulares de apio, he usado con éxito
chord
tarea para un propósito similar:http://celery.readthedocs.org/en/latest/userguide/canvas.html#chords
Incrementar
settings.CELERYD_CONCURRENCY
a un número que sea razonable y que pueda pagar, entonces esos trabajadores del apio seguirán ejecutando sus tareas en grupo o acorde hasta que terminen.Nota: debido a un error en
kombu
hubo problemas con la reutilización de trabajadores para una gran cantidad de tareas en el pasado, no sé si se ha solucionado ahora. Tal vez lo sea, pero si no, reduzca CELERYD_MAX_TASKS_PER_CHILD.Ejemplo basado en código simplificado y modificado que ejecuto:
@app.task def do_matches(): match_data = ... result = chord(single_batch_processor.s(m) for m in match_data)(summarize.s())
summarize
obtiene resultados de todas lassingle_batch_processor
tareas. Cada tarea se ejecuta en cualquier trabajador de Apio,kombu
coordina eso.Ahora lo entiendo:
single_batch_processor
ysummarize
TAMBIÉN tienen que ser tareas de apio, no funciones regulares; de lo contrario, por supuesto, no se paralelizará (ni siquiera estoy seguro de que el constructor de acordes lo acepte si no es una tarea de apio).fuente
chord
(con CELERYD_CONCURRENCY configurado para docenas de trabajadores == cpus lógicos / subprocesos de hardware) así es como proceso una gran cantidad de lotes de archivos de registro de manera paralela en varios núcleos.do_matches
se bloqueará esperando el acorde. Esto posiblemente podría conducir a un punto muerto parcial o total, ya que muchos o todos los trabajadores podrían esperar subtareas, ninguna de las cuales se realizará (ya que los trabajadores esperan subtareas en lugar de trabajar duro).Agregar más trabajadores de apio ciertamente acelerará la ejecución de la tarea. Sin embargo, es posible que tenga otro cuello de botella: la base de datos. Asegúrese de que pueda manejar las inserciones / actualizaciones simultáneas.
Con respecto a su pregunta: está agregando trabajadores de apio asignando otro proceso en sus instancias EC2 como
celeryd
. Dependiendo de cuántos trabajadores necesite, es posible que desee agregar incluso más instancias.fuente