Tarea distribuida en paralelo de apio con multiprocesamiento

80

Tengo una tarea de apio intensiva en la CPU. Me gustaría usar toda la potencia de procesamiento (núcleos) en muchas instancias EC2 para hacer este trabajo más rápido (una tarea distribuida en paralelo de apio con multiprocesamiento, creo ) .

Los términos, subprocesamiento , multiprocesamiento , computación distribuida , procesamiento paralelo distribuido son todos términos que estoy tratando de entender mejor.

Tarea de ejemplo:

  @app.task
  for item in list_of_millions_of_ids:
      id = item # do some long complicated equation here very CPU heavy!!!!!!! 
      database.objects(newid=id).save()

Usando el código anterior (con un ejemplo si es posible), ¿cómo se distribuiría esta tarea usando Celery al permitir que esta tarea se divida utilizando toda la potencia de la CPU informática en todas las máquinas disponibles en la nube?

Prometeo
fuente
Pensé que MapReduce fue diseñado para su tipo de aplicación en mente: console.aws.amazon.com/elasticmapreduce/vnext/… :
AStopher

Respuestas:

120

Tus metas son:

  1. Distribuya su trabajo a muchas máquinas (computación distribuida / procesamiento paralelo distribuido)
  2. Distribuir el trabajo en una máquina determinada en todas las CPU (multiprocesamiento / subprocesamiento)

El apio puede hacer ambas cosas con bastante facilidad. Lo primero que hay que entender es que cada trabajador de apio está configurado de forma predeterminada para ejecutar tantas tareas como núcleos de CPU haya disponibles en un sistema:

La simultaneidad es el número de procesos de trabajo previos a la bifurcación utilizados para procesar sus tareas al mismo tiempo, cuando todos ellos están ocupados haciendo trabajo, las nuevas tareas tendrán que esperar a que finalice una de las tareas antes de que pueda procesarse.

El número de concurrencia predeterminado es el número de CPU en esa máquina (incluidos los núcleos) , puede especificar un número personalizado usando la opción -c. No hay un valor recomendado, ya que el número óptimo depende de varios factores, pero si sus tareas están en su mayoría vinculadas a E / S, entonces puede intentar aumentarlo, la experimentación ha demostrado que agregar más del doble de la cantidad de CPU rara vez es eficaz, y en su lugar probablemente degradará el rendimiento.

Esto significa que cada tarea individual no necesita preocuparse por el uso de multiprocesamiento / subprocesamiento para hacer uso de múltiples CPU / núcleos. En cambio, el apio ejecutará suficientes tareas al mismo tiempo para usar cada CPU disponible.

Con eso fuera del camino, el siguiente paso es crear una tarea que maneje el procesamiento de algún subconjunto de su list_of_millions_of_ids. Aquí tiene un par de opciones: una es hacer que cada tarea maneje una única ID, por lo que ejecuta N tareas, donde N == len(list_of_millions_of_ids). Esto garantizará que el trabajo se distribuya uniformemente entre todas sus tareas, ya que nunca habrá un caso en el que un trabajador termine antes de tiempo y simplemente esté esperando; si necesita trabajo, puede sacar una identificación de la cola. Puede hacer esto (como lo menciona John Doe) usando un apiogroup .

tasks.py:

@app.task
def process_id(item):
    id = item #long complicated equation here
    database.objects(newid=id).save()

Y para ejecutar las tareas:

from celery import group
from tasks import process_id

jobs = group(process_id.s(item) for item in list_of_millions_of_ids)
result = jobs.apply_async()

Otra opción es dividir la lista en partes más pequeñas y distribuirlas a sus trabajadores. Este enfoque corre el riesgo de desperdiciar algunos ciclos, porque puede terminar con algunos trabajadores esperando mientras otros todavía están trabajando. Sin embargo, la documentación del apio señala que esta preocupación a menudo es infundada:

A algunos les puede preocupar que dividir sus tareas resulte en una degradación del paralelismo, pero esto rara vez es cierto para un clúster ocupado y, en la práctica, dado que está evitando la sobrecarga de la mensajería, puede aumentar considerablemente el rendimiento.

Por lo tanto, puede encontrar que fragmentar la lista y distribuir los fragmentos a cada tarea funciona mejor, debido a la reducción de la sobrecarga de mensajería. Probablemente también pueda aligerar un poco la carga en la base de datos de esta manera, calculando cada identificación, almacenándola en una lista y luego agregando la lista completa en la base de datos una vez que haya terminado, en lugar de hacerlo una identificación a la vez . El enfoque de fragmentación se vería así

tasks.py:

@app.task
def process_ids(items):
    for item in items:
        id = item #long complicated equation here
        database.objects(newid=id).save() # Still adding one id at a time, but you don't have to.

Y para empezar las tareas:

from tasks import process_ids

jobs = process_ids.chunks(list_of_millions_of_ids, 30) # break the list into 30 chunks. Experiment with what number works best here.
jobs.apply_async()

Puede experimentar un poco con qué tamaño de fragmento le da el mejor resultado. Desea encontrar un punto óptimo en el que reduzca la sobrecarga de mensajería y, al mismo tiempo, mantenga el tamaño lo suficientemente pequeño como para no terminar con los trabajadores terminando su parte mucho más rápido que otro trabajador y luego simplemente esperando sin nada que hacer.

dano
fuente
Entonces, la parte en la que hago una "tarea complicada de CPU (tal vez renderizado en 3D)" se distribuirá automáticamente y se procesará en paralelo, es decir, 1 tarea usará tanta potencia de procesamiento como esté disponible en todas las instancias --- y todo esto fuera de -¿la caja? ¿De Verdad? Guau. PD buena respuesta gracias por explicarme esto mejor.
Prometheus
3
@Spike No del todo. Las tareas, tal como están escritas actualmente, solo pueden usar un núcleo. Para hacer que una tarea individual use más de un núcleo, introduciríamos threadingo multiprocessing. En lugar de hacer eso, hacemos que cada trabajador del apio genere tantas tareas como núcleos haya disponibles en la máquina (esto sucede de forma predeterminada en el apio). Eso significa que en todo su clúster, cada núcleo se puede utilizar para procesar su list_of_million_ids, haciendo que cada tarea utilice un solo núcleo. Entonces, en lugar de tener una sola tarea que use muchos núcleos, tenemos muchas tareas, cada una de las cuales usa un núcleo. ¿Tiene sentido?
dano
1
"Para hacer que una tarea individual utilice más de un núcleo, deberíamos introducir threadingo multiprocessing". Suponiendo que no podamos dividir esa tarea pesada en varias, ¿cómo usaría el subproceso o el multiprocesamiento para que el apio divida la tarea entre varias instancias? gracias
Tristan
@Tristan Depende de lo que realmente esté haciendo la tarea. Sin embargo, en la mayoría de los casos, diría que si no puede dividir la tarea en sí en subtareas, probablemente tendrá dificultades multiprocessingpara dividir el trabajo desde el interior de la tarea, ya que ambos enfoques requieren en última instancia hacer el Lo mismo: dividir una tarea en tareas más pequeñas que se pueden ejecutar en paralelo. Realmente solo está cambiando el punto en el que está dividiendo.
dano
1
@PirateApp Ese problema dice que no se puede usar multiprocessing dentro de una tarea de Apio. El propio apio usa billiard(un multiprocessingtenedor) para ejecutar sus tareas en procesos separados. Simplemente no está permitido usarlos multiprocessingdentro de ellos.
dano
12

En el mundo de la distribución solo hay una cosa que debes recordar sobre todo:

La optimización prematura es la fuente de todos los males. Por D. Knuth

Sé que suena evidente, pero antes de distribuir la verificación doble, está utilizando el mejor algoritmo (si existe ...). Dicho esto, optimizar la distribución es un acto de equilibrio entre 3 cosas:

  1. Escribir / leer datos de un medio persistente,
  2. Mover datos del medio A al medio B,
  3. Procesando datos,

Las computadoras están hechas para que cuanto más se acerque a su unidad de procesamiento (3), más rápido y más eficiente (1) y (2) serán. El orden en un clúster clásico será: disco duro de red, disco duro local, RAM, dentro del territorio de la unidad de procesamiento ... Hoy en día, los procesadores se están volviendo lo suficientemente sofisticados como para ser considerados como un conjunto de unidades de procesamiento de hardware independientes comúnmente llamadas núcleos, estos núcleos procesan datos (3) a través de hilos (2). Imagine que su núcleo es tan rápido que cuando envía datos con un hilo está utilizando el 50% de la potencia de la computadora, si el núcleo tiene 2 hilos, entonces utilizará el 100%. Dos subprocesos por núcleo se denominan hiperprocesos, y su sistema operativo verá 2 CPU por núcleo hiperprocesado.

La administración de subprocesos en un procesador se denomina comúnmente multiproceso. La administración de CPU desde el sistema operativo se denomina comúnmente multiprocesamiento. La gestión de tareas simultáneas en un clúster se denomina comúnmente programación paralela. La gestión de tareas dependientes en un clúster se denomina comúnmente programación distribuida.

Entonces, ¿dónde está tu cuello de botella?

  • En (1): intente persistir y transmitir desde el nivel superior (el más cercano a su unidad de procesamiento, por ejemplo, si el disco duro de la red es lento, primero guarde en el disco duro local)
  • En (2): Este es el más común, trate de evitar los paquetes de comunicación que no son necesarios para la distribución o comprima los paquetes "sobre la marcha" (por ejemplo, si el HD es lento, guarde solo un mensaje "calculado por lotes" y mantenga el resultados intermedios en RAM).
  • En (3): ¡Listo! Está utilizando toda la potencia de procesamiento a su disposición.

¿Y el apio?

Apio es un marco de mensajería para programación distribuida, que utilizará un módulo de intermediario para la comunicación (2) y un módulo de backend para la persistencia (1), esto significa que podrá cambiar la configuración para evitar la mayoría de los cuellos de botella (si es posible) en su red y solo en su red. Primero, perfile tu código para lograr el mejor rendimiento en una sola computadora. Luego use apio en su racimo con la configuración predeterminada y establezca CELERY_RESULT_PERSISTENT=True:

from celery import Celery

app = Celery('tasks', 
             broker='amqp://guest@localhost//',
             backend='redis://localhost')

@app.task
def process_id(all_the_data_parameters_needed_to_process_in_this_computer):
    #code that does stuff
    return result

Durante la ejecución, abra sus herramientas de monitoreo favoritas, yo uso el predeterminado para rabbitMQ y flower para apio y top para cpus, sus resultados se guardarán en su backend. Un ejemplo de cuello de botella en la red es la cola de tareas que crece tanto que retrasan la ejecución, puede proceder a cambiar los módulos o la configuración de apio, si no su cuello de botella está en otro lugar.

tk.
fuente
9

Por qué no usar group tarea de apio para esto?

http://celery.readthedocs.org/en/latest/userguide/canvas.html#groups

Básicamente, debes dividirlos idsen trozos (o rangos) y asignarlos a un montón de tareas engroup .

Para algo más sofisticado, como agregar resultados de tareas particulares de apio, he usado con éxito chord tarea para un propósito similar:

http://celery.readthedocs.org/en/latest/userguide/canvas.html#chords

Incrementar settings.CELERYD_CONCURRENCY a un número que sea razonable y que pueda pagar, entonces esos trabajadores del apio seguirán ejecutando sus tareas en grupo o acorde hasta que terminen.

Nota: debido a un error en kombu hubo problemas con la reutilización de trabajadores para una gran cantidad de tareas en el pasado, no sé si se ha solucionado ahora. Tal vez lo sea, pero si no, reduzca CELERYD_MAX_TASKS_PER_CHILD.

Ejemplo basado en código simplificado y modificado que ejecuto:

@app.task
def do_matches():
    match_data = ...
    result = chord(single_batch_processor.s(m) for m in match_data)(summarize.s())

summarizeobtiene resultados de todas las single_batch_processortareas. Cada tarea se ejecuta en cualquier trabajador de Apio, kombucoordina eso.

Ahora lo entiendo: single_batch_processory summarizeTAMBIÉN tienen que ser tareas de apio, no funciones regulares; de lo contrario, por supuesto, no se paralelizará (ni siquiera estoy seguro de que el constructor de acordes lo acepte si no es una tarea de apio).

LetMeSOThat4U
fuente
Según tengo entendido, esto dividiría la tarea, pero no está utilizando una tarea distribuida en paralelo de apio con multiprocesamiento. es decir, utilizando toda la potencia de la CPU disponible en todas las máquinas en la nube.
Prometheus
No estoy seguro de por qué sucedería esto: el apio funciona como si tuviera un grupo de trabajadores, independientemente de dónde se encuentren, incluso podrían estar ubicados en otra máquina. Por supuesto, necesita tener más de un trabajador. chord(con CELERYD_CONCURRENCY configurado para docenas de trabajadores == cpus lógicos / subprocesos de hardware) así es como proceso una gran cantidad de lotes de archivos de registro de manera paralela en varios núcleos.
LetMeSOThat4U
Este es un MUY MALO ejemplo de código. La tarea do_matchesse bloqueará esperando el acorde. Esto posiblemente podría conducir a un punto muerto parcial o total, ya que muchos o todos los trabajadores podrían esperar subtareas, ninguna de las cuales se realizará (ya que los trabajadores esperan subtareas en lugar de trabajar duro).
Prisacari Dmitrii
@PrisacariDmitrii Entonces, ¿cuál sería la solución correcta?
LetMeSOThat4U
4

Agregar más trabajadores de apio ciertamente acelerará la ejecución de la tarea. Sin embargo, es posible que tenga otro cuello de botella: la base de datos. Asegúrese de que pueda manejar las inserciones / actualizaciones simultáneas.

Con respecto a su pregunta: está agregando trabajadores de apio asignando otro proceso en sus instancias EC2 como celeryd. Dependiendo de cuántos trabajadores necesite, es posible que desee agregar incluso más instancias.

Torsten Engelbrecht
fuente
> Agregar más trabajadores de apio ciertamente acelerará la ejecución de la tarea. --- ¿Lo hace? Entonces, ¿su dicho que el apio distribuirá esa tarea entre todas mis instancias sin que tenga que cortarla?
Prometeo
Espera un segundo. Acabo de leer su código nuevamente y, dado que es solo una tarea, esto no ayudará. Puede disparar una tarea por id (o trozos de id). O sigue el consejo de John Doe en la otra respuesta. Entonces puede beneficiarse de la cantidad de trabajadores del apio. Y sí, en este caso no necesitas hacer mucho. Solo asegúrese de que los trabajadores consuman las mismas colas.
Torsten Engelbrecht