Multiprocesamiento de Python: comprensión de la lógica detrás de `chunksize`

84

¿Qué factores determinan un chunksizeargumento óptimo para métodos como multiprocessing.Pool.map()? El .map()método parece utilizar una heurística arbitraria para su tamaño de trozo predeterminado (explicado a continuación); ¿Qué motiva esa elección y hay un enfoque más reflexivo basado en alguna situación o configuración particular?

Ejemplo: digamos que soy:

  • Pasar un iterablea .map()que tiene ~ 15 millones de elementos;
  • Trabajando en una máquina con 24 núcleos y usando el predeterminado processes = os.cpu_count()dentro multiprocessing.Pool().

Mi pensamiento ingenuo es dar a cada uno de los 24 trabajadores una porción del mismo tamaño, es decir, 15_000_000 / 24625.000. Los trozos grandes deben reducir la rotación / gastos generales al tiempo que se utiliza completamente a todos los trabajadores. Pero parece que a esto le faltan algunas desventajas potenciales de dar lotes grandes a cada trabajador. ¿Es esta una imagen incompleta y qué me estoy perdiendo?


Parte de mi pregunta proviene de la lógica predeterminada para if chunksize=None: both .map()y .starmap()call .map_async(), que se ve así:

def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
               error_callback=None):
    # ... (materialize `iterable` to list if it's an iterator)
    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)  # ????
        if extra:
            chunksize += 1
    if len(iterable) == 0:
        chunksize = 0

¿Cuál es la lógica detrás divmod(len(iterable), len(self._pool) * 4)? Esto implica que el tamaño de trozo estará más cerca de 15_000_000 / (24 * 4) == 156_250. ¿Cuál es la intención de multiplicar len(self._pool)por 4?

Esto hace que el tamaño del fragmento resultante sea un factor de 4 más pequeño que mi "lógica ingenua" de arriba, que consiste en dividir la longitud del iterable por el número de trabajadores en pool._pool.

Por último, también hay este fragmento de los documentos de Python .imap()que impulsa aún más mi curiosidad:

El chunksizeargumento es el mismo que el utilizado por el map() método. Para iterables muy largos, el uso de un valor grande para chunksizepuede hacer que el trabajo se complete mucho más rápido que el uso del valor predeterminado de 1.


Respuesta relacionada que es útil pero un poco de alto nivel: multiprocesamiento de Python: ¿por qué los trozos grandes son más lentos? .

Brad Solomon
fuente
1
4Es arbitrario y todo el cálculo de chunksize es heurístico. El factor relevante es cuánto puede variar su tiempo de procesamiento real. Un poco más sobre esto aquí hasta que tenga tiempo para una respuesta si todavía es necesario.
Darkonaut
¿Ha marcado esta pregunta ?
Andrew Naguib
1
Gracias @AndrewNaguib, en realidad no me había topado con ese de alguna manera
Brad Solomon
1
Solo para hacerle saber: no olvidé esta pregunta. De hecho, estoy trabajando en una respuesta canónica de dimensiones bíblicas (muchos fragmentos de código útiles y gráficos elegantes) desde el día en que lo preguntaste. La recompensa llegó entre 1 y 2 semanas antes de la fecha límite para completarla, pero estoy seguro de que podré entregar algo lo suficientemente cerca antes de la fecha límite.
Darkonaut
@BradSolomon De nada :). ¿Responde a tu pregunta?
Andrew Naguib

Respuestas:

193

Respuesta corta

El algoritmo de chunksize de Pool es heurístico. Proporciona una solución sencilla para todos los escenarios de problemas imaginables que está intentando introducir en los métodos de Pool. Como consecuencia, no se puede optimizar para ningún escenario específico .

El algoritmo divide arbitrariamente el iterable en aproximadamente cuatro veces más partes que el enfoque ingenuo. Más fragmentos significan más gastos generales, pero una mayor flexibilidad de programación. Cómo se mostrará esta respuesta, esto conduce a una mayor utilización de los trabajadores en promedio, pero sin la garantía de un tiempo de cálculo general más corto para cada caso.

"Es bueno saberlo", podría pensar, "pero ¿cómo me ayuda saber esto con mis problemas concretos de multiprocesamiento?" Bueno, no es así. La respuesta corta más honesta es "no hay una respuesta corta", "el multiprocesamiento es complejo" y "depende". Un síntoma observado puede tener diferentes raíces, incluso para escenarios similares.

Esta respuesta intenta brindarle conceptos básicos que lo ayudarán a obtener una imagen más clara de la caja negra de programación de Pool. También intenta brindarle algunas herramientas básicas a mano para reconocer y evitar posibles acantilados en la medida en que estén relacionados con el tamaño del trozo.


Tabla de contenido

Parte I

  1. Definiciones
  2. Metas de paralelización
  3. Escenarios de paralelización
  4. Riesgos de Chunksize> 1
  5. Algoritmo de tamaño de trozos de la piscina
  6. Cuantificación de la eficiencia del algoritmo

    6.1 Modelos

    6.2 Programación paralela

    6.3 Eficiencias

    6.3.1 Eficiencia de distribución absoluta (ADE)

    6.3.2 Eficiencia de distribución relativa (RDE)

Parte II

  1. Ingenuo vs.Algoritmo de Chunksize de Pool
  2. Verificación de la realidad
  3. Conclusión

Primero es necesario aclarar algunos términos importantes.


1. Definiciones


Pedazo

Un fragmento aquí es una parte del iterableargumento especificado en una llamada al método pool. Cómo se calcula el tamaño del trozo y qué efectos puede tener, es el tema de esta respuesta.


Tarea

La representación física de una tarea en un proceso de trabajo en términos de datos se puede ver en la siguiente figura.

figure0

La figura muestra una llamada de ejemplo a pool.map(), mostrada a lo largo de una línea de código, tomada de la multiprocessing.pool.workerfunción, donde inqueuese desempaqueta una tarea leída de . workeres la función principal subyacente en MainThreadun proceso de trabajo de grupo. El func-argumento especificado en el método pool solo coincidirá con la func-variable dentro de la función worker-para métodos de llamada única como apply_asyncy para imapcon chunksize=1. Para el resto de los métodos de grupo con un chunksizeparámetro, la función de procesamiento funcserá una función de mapeador ( mapstaro starmapstar). Esta función asigna el funcparámetro especificado por el usuario en cada elemento del fragmento transmitido del iterable (-> "tareas de mapa"). El tiempo que toma, define una tareatambién como unidad de trabajo .


Taskel

Si bien el uso de la palabra "tarea" para todo el procesamiento de un fragmento se corresponde con el código interno multiprocessing.pool, no hay ninguna indicación de cómo una sola llamada al usuario especificado func, con un elemento del fragmento como argumento (s), debe ser referido a. Para evitar la confusión que surge de los conflictos de nombres (piense en el parámetro -paramaxtasksperchild el __init__método de Pool ), esta respuesta se referirá a las unidades individuales de trabajo dentro de una tarea como taskel .

Un taskel (de tarea + elemento ) es la unidad de trabajo más pequeña dentro de una tarea . Es la ejecución única de la función especificada con el funcparámetro -de un Poolmétodo-, llamado con argumentos obtenidos de un solo elemento del fragmento transmitido . Una tarea consta de chunksize tareas .


Sobrecarga de paralelización (PO)

PO consta de gastos generales internos de Python y gastos generales para la comunicación entre procesos (IPC). La sobrecarga por tarea dentro de Python viene con el código necesario para empaquetar y desempaquetar las tareas y sus resultados. IPC-overhead viene con la necesaria sincronización de hilos y la copia de datos entre diferentes espacios de direcciones (se necesitan dos pasos de copia: padre -> cola -> hijo). La cantidad de sobrecarga de IPC depende del tamaño de los datos, el hardware y el sistema operativo, lo que dificulta las generalizaciones sobre el impacto.


2. Objetivos de paralelización

Cuando usamos multiprocesamiento, nuestro objetivo general (obviamente) es minimizar el tiempo total de procesamiento para todas las tareas. Para alcanzar este objetivo general, nuestro objetivo técnico debe ser optimizar la utilización de los recursos de hardware .

Algunos subobjetivos importantes para lograr el objetivo técnico son:

  • minimizar la sobrecarga de paralelización (el más famoso, pero no solo: IPC )
  • alta utilización en todos los núcleos de cpu
  • mantener el uso de la memoria limitado para evitar que el sistema operativo pague en exceso ( papelera )

Al principio, las tareas deben ser lo suficientemente pesadas (intensivas) computacionalmente, para recuperar el PO que tenemos que pagar por la paralelización. La relevancia de PO disminuye al aumentar el tiempo de cálculo absoluto por tarea. O, para decirlo al revés, cuanto mayor sea el tiempo de cálculo absoluto por tarea para su problema, menos relevante será la necesidad de reducir el PO. Si su cálculo tomará horas por tarea, la sobrecarga de IPC será insignificante en comparación. La principal preocupación aquí es evitar que los procesos de trabajo inactivos después de que se hayan distribuido todas las tareas. Mantener todos los núcleos cargados significa que estamos paralelizando tanto como sea posible.


3. Escenarios de paralelización

¿Qué factores determinan un argumento de tamaño de trozo óptimo para métodos como el multiprocesamiento. Pool.map ()

El factor principal en cuestión es cuánto tiempo de cálculo puede variar entre nuestras tareas individuales. Para nombrarlo, la elección de un tamaño de trozo óptimo está determinada por el coeficiente de variación ( CV ) para los tiempos de cálculo por tarea.

Los dos escenarios extremos en una escala, según el alcance de esta variación son:

  1. Todas las tareas necesitan exactamente el mismo tiempo de cálculo.
  2. Una tarea puede tardar segundos o días en completarse.

Para una mejor memorización, me referiré a estos escenarios como:

  1. Escenario denso
  2. Escenario amplio


Escenario denso

En un escenario denso , sería deseable distribuir todas las tareas a la vez, para mantener la IPC necesaria y el cambio de contexto al mínimo. Esto significa que queremos crear solo la cantidad de fragmentos, la cantidad de procesos de trabajo que haya. Como ya se dijo anteriormente, el peso de PO aumenta con tiempos de cálculo más cortos por tarea.

Para un rendimiento máximo, también queremos que todos los procesos de trabajo estén ocupados hasta que se procesen todas las tareas (sin trabajadores inactivos). Para este objetivo, los fragmentos distribuidos deben ser del mismo tamaño o cerca de.


Escenario amplio

El mejor ejemplo de un escenario amplio sería un problema de optimización, donde los resultados convergen rápidamente o el cálculo puede llevar horas, si no días. Por lo general, no es predecible qué combinación de "tareas ligeras" y "tareas pesadas" contendrá una tarea en tal caso, por lo que no es aconsejable distribuir demasiadas tareas en un lote de tareas a la vez. Distribuir menos tareas a la vez de lo posible significa aumentar la flexibilidad de programación. Esto es necesario aquí para alcanzar nuestro subobjetivo de una alta utilización de todos los núcleos.

Si los Poolmétodos, por defecto, estuvieran totalmente optimizados para el escenario denso, crearían cada vez más tiempos subóptimos para cada problema ubicado más cerca del escenario amplio.


4. Riesgos de Chunksize> 1

Considere este ejemplo simplificado de pseudocódigo de un Escenario Amplio -iterable, que queremos pasar a un método de grupo:

good_luck_iterable = [60, 60, 86400, 60, 86400, 60, 60, 84600]

En lugar de los valores reales, pretendemos ver el tiempo de cálculo necesario en segundos, por simplicidad solo 1 minuto o 1 día. Suponemos que el grupo tiene cuatro procesos de trabajo (en cuatro núcleos) y chunksizeestá configurado en 2. Debido a que se mantendrá el orden, los fragmentos enviados a los trabajadores serán los siguientes:

[(60, 60), (86400, 60), (86400, 60), (60, 84600)]

Como tenemos suficientes trabajadores y el tiempo de cálculo es lo suficientemente alto, podemos decir que cada proceso de trabajo tendrá una parte en la que trabajar en primer lugar. (Este no tiene por qué ser el caso para completar tareas rápidamente). Además, podemos decir que todo el procesamiento tomará aproximadamente 86400 + 60 segundos, porque ese es el tiempo de cálculo total más alto para un fragmento en este escenario artificial y distribuimos fragmentos solo una vez.

Ahora considere este iterable, que tiene solo un elemento cambiando su posición en comparación con el iterable anterior:

bad_luck_iterable = [60, 60, 86400, 86400, 60, 60, 60, 84600]

... y los trozos correspondientes:

[(60, 60), (86400, 86400), (60, 60), (60, 84600)]

¡Solo mala suerte con la clasificación de nuestro iterable casi duplicó (86400 + 86400) nuestro tiempo total de procesamiento! El trabajador que recibe el fragmento vicioso (86400, 86400) está bloqueando el segundo taskel pesado en su tarea para que no se distribuya a uno de los trabajadores inactivos que ya terminaron con sus (60, 60) fragmentos. Obviamente, no nos arriesgaríamos a un resultado tan desagradable si nos dispusiéramos chunksize=1.

Este es el riesgo de trozos más grandes. Con tamaños más altos, cambiamos la flexibilidad de programación por menos gastos generales y, en casos como el anterior, eso es un mal negocio.

Cómo lo veremos en el capítulo 6. Cuantificación de la eficiencia del algoritmo , los fragmentos más grandes también pueden conducir a resultados subóptimos para los escenarios densos .


5. Algoritmo de trozos de pool

A continuación, encontrará una versión ligeramente modificada del algoritmo dentro del código fuente. Como puede ver, corté la parte inferior y la envolví en una función para calcular el chunksizeargumento externamente. También reemplacé 4con un factorparámetro y subcontraté las len()llamadas.

# mp_utils.py

def calc_chunksize(n_workers, len_iterable, factor=4):
    """Calculate chunksize argument for Pool-methods.

    Resembles source-code within `multiprocessing.pool.Pool._map_async`.
    """
    chunksize, extra = divmod(len_iterable, n_workers * factor)
    if extra:
        chunksize += 1
    return chunksize

Para asegurarnos de que todos estamos en la misma página, esto es lo que divmodhace:

divmod(x, y)es una función incorporada que regresa (x//y, x%y). x // yes la división de piso, que devuelve el cociente redondeado hacia abajo de x / y, mientras que x % yes la operación de módulo que devuelve el resto de x / y. De ahí, por ejemplo, divmod(10, 3)devoluciones (3, 1).

Ahora, cuando nos fijamos en chunksize, extra = divmod(len_iterable, n_workers * 4), se dará cuenta de n_workersque aquí es el divisor yen x / yy por la multiplicación 4, sin un ajuste adicional a través de if extra: chunksize +=1más adelante, conduce a una chunksize inicial de al menos cuatro veces más pequeño (por len_iterable >= n_workers * 4) de lo que sería de otra manera.

Para ver el efecto de la multiplicación por 4en el resultado de tamaño de fragmento intermedio, considere esta función:

def compare_chunksizes(len_iterable, n_workers=4):
    """Calculate naive chunksize, Pool's stage-1 chunksize and the chunksize
    for Pool's complete algorithm. Return chunksizes and the real factors by
    which naive chunksizes are bigger.
    """
    cs_naive = len_iterable // n_workers or 1  # naive approach
    cs_pool1 = len_iterable // (n_workers * 4) or 1  # incomplete pool algo.
    cs_pool2 = calc_chunksize(n_workers, len_iterable)

    real_factor_pool1 = cs_naive / cs_pool1
    real_factor_pool2 = cs_naive / cs_pool2

    return cs_naive, cs_pool1, cs_pool2, real_factor_pool1, real_factor_pool2

La función anterior calcula el tamaño de trozo ingenuo ( cs_naive) y el tamaño de trozo del primer paso del algoritmo de tamaño de trozo ( cs_pool1) de Pool , así como el tamaño de trozo del algoritmo de grupo completo ( cs_pool2). Además, calcula los factores reales rf_pool1 = cs_naive / cs_pool1 y rf_pool2 = cs_naive / cs_pool2, que nos dicen cuántas veces los tamaños de trozos calculados ingenuamente son más grandes que las versiones internas de Pool.

A continuación, verá dos figuras creadas con la salida de esta función. La figura de la izquierda solo muestra los tamaños de trozos n_workers=4hasta una longitud iterable de 500. La figura de la derecha muestra los valores de rf_pool1. Para longitudes iterables 16, el factor real se convierte en >=4(para len_iterable >= n_workers * 4) y su valor máximo es 7para longitudes iterables 28-31. Esa es una desviación masiva del factor original 4al que converge el algoritmo para iterables más largos. "Más tiempo" aquí es relativo y depende del número de trabajadores especificados.

Figura 1

Recuerde que chunksize cs_pool1todavía carece del extraajuste -con el resto del divmodcontenido cs_pool2del algoritmo completo.

El algoritmo continúa con:

if extra:
    chunksize += 1

Ahora bien, en los casos estaban allí es un resto (una extrade la DIVMOD-operación), el aumento de la chunksize por 1, obviamente, no puede trabajar para cada tarea. Después de todo, si así fuera, para empezar, no quedaría un resto.

Como se puede ver en las figuras siguientes, el " extra-tratamiento " tiene el efecto, que el verdadero factor de rf_pool2ahora converge hacia 4desde abajo 4 y la desviación es un poco más suave. Desviación estándar para n_workers=4y len_iterable=500cae de 0.5233para rf_pool1a 0.4115para rf_pool2.

Figura 2

Eventualmente, aumentar chunksizeen 1 tiene el efecto de que la última tarea transmitida solo tenga un tamaño de len_iterable % chunksize or chunksize.

Sin embargo , el efecto más interesante y como veremos más adelante, más consecuente, del tratamiento adicional se puede observar para la cantidad de fragmentos generados ( n_chunks). Para iterables lo suficientemente largos, el algoritmo de tamaño de trozos completo de Pool ( n_pool2en la figura siguiente) estabilizará el número de trozos en n_chunks == n_workers * 4. Por el contrario, el algoritmo ingenuo (después de un eructo inicial) sigue alternando entre n_chunks == n_workersy a n_chunks == n_workers + 1medida que crece la duración del iterable.

figura 3

A continuación, encontrará dos funciones de información mejoradas para Pool y el ingenuo algoritmo chunksize. La salida de estas funciones será necesaria en el próximo capítulo.

# mp_utils.py

from collections import namedtuple


Chunkinfo = namedtuple(
    'Chunkinfo', ['n_workers', 'len_iterable', 'n_chunks',
                  'chunksize', 'last_chunk']
)

def calc_chunksize_info(n_workers, len_iterable, factor=4):
    """Calculate chunksize numbers."""
    chunksize, extra = divmod(len_iterable, n_workers * factor)
    if extra:
        chunksize += 1
    # `+ (len_iterable % chunksize > 0)` exploits that `True == 1`
    n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
    # exploit `0 == False`
    last_chunk = len_iterable % chunksize or chunksize

    return Chunkinfo(
        n_workers, len_iterable, n_chunks, chunksize, last_chunk
    )

No se confunda por el aspecto probablemente inesperado de calc_naive_chunksize_info. El extradesde divmodno se utiliza para calcular el tamaño del fragmento.

def calc_naive_chunksize_info(n_workers, len_iterable):
    """Calculate naive chunksize numbers."""
    chunksize, extra = divmod(len_iterable, n_workers)
    if chunksize == 0:
        chunksize = 1
        n_chunks = extra
        last_chunk = chunksize
    else:
        n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
        last_chunk = len_iterable % chunksize or chunksize

    return Chunkinfo(
        n_workers, len_iterable, n_chunks, chunksize, last_chunk
    )

6. Cuantificación de la eficiencia del algoritmo

Ahora, después de haber visto cómo la salida del Poolalgoritmo de tamaño de chunksize se ve diferente en comparación con la salida del algoritmo ingenuo ...

  • ¿Cómo saber si el enfoque de Pool realmente mejora algo?
  • ¿Y qué podría ser exactamente este algo ?

Como se muestra en el capítulo anterior, para iterables más largos (una mayor cantidad de tareas), el algoritmo de tamaño de trozos de Pool divide aproximadamente el iterable en cuatro veces más partes que el método ingenuo. Los fragmentos más pequeños significan más tareas y más tareas significan más gastos indirectos de paralelización (PO) , un costo que debe sopesarse con el beneficio de una mayor flexibilidad de programación (recuerde "Riesgos de tamaño de fragmento> 1" ).

Por razones bastante obvias, el algoritmo de chunksize básico de Pool no puede sopesar la flexibilidad de programación con la PO para nosotros. La sobrecarga de IPC depende del tamaño de los datos, el hardware y el sistema operativo. El algoritmo no puede saber en qué hardware ejecutamos nuestro código, ni tiene idea de cuánto tardará una tarea en completarse. Es una heurística que proporciona una funcionalidad básica para todos los escenarios posibles. Esto significa que no se puede optimizar para ningún escenario en particular. Como se mencionó anteriormente, PO también se vuelve cada vez menos preocupante con el aumento de los tiempos de cálculo por tarea (correlación negativa).

Cuando recuerde los Objetivos de Paralelización del capítulo 2, un punto fue:

  • alta utilización en todos los núcleos de cpu

El anteriormente mencionado algo , chunksize-algoritmo de piscina puede tratar de mejorar es la minimización de ralentí trabajadores-procesos , respectivamente, la utilización de la CPU-cores .

Las multiprocessing.Poolpersonas que se preguntan acerca de los núcleos no utilizados / procesos de trabajo inactivos hacen una pregunta repetida sobre SO con respecto a situaciones en las que se esperaría que todos los procesos de trabajo estén ocupados. Si bien esto puede tener muchas razones, los procesos de trabajo inactivos hacia el final de un cálculo son una observación que a menudo podemos hacer, incluso con Escenarios densos (tiempos de cálculo iguales por tarea) en los casos en que el número de trabajadores no es un divisor del número. de trozos ( n_chunks % n_workers > 0).

La pregunta ahora es:

¿Cómo podemos traducir prácticamente nuestra comprensión de los fragmentos en algo que nos permita explicar la utilización observada del trabajador, o incluso comparar la eficiencia de diferentes algoritmos en ese sentido?


6.1 Modelos

Para obtener información más profunda aquí, necesitamos una forma de abstracción de cálculos paralelos que simplifique la realidad demasiado compleja hasta un grado manejable de complejidad, al tiempo que conserva la importancia dentro de los límites definidos. Tal abstracción se llama modelo . Una implementación de tal " Modelo de Paralelización" (PM) genera metadatos mapeados por el trabajador (marcas de tiempo) como lo harían los cálculos reales, si los datos fueran recopilados. Los metadatos generados por el modelo permiten predecir métricas de cálculos paralelos bajo ciertas restricciones.

Figura 4

Uno de los dos submodelos dentro del PM definido aquí es el Modelo de Distribución (DM) . El DM explica cómo las unidades atómicas de trabajo (taskels) se distribuyen entre los trabajadores paralelos y el tiempo , cuando no se consideran otros factores que el algoritmo chunksize-size respectivo, el número de trabajadores, el input-iterable (número de taskels) y su duración de cálculo. . Esto significa que no se incluye ningún tipo de gastos generales .

Para obtener un MP completo , el DM se amplía con un Modelo de Sobrecarga (OM) , que representa varias formas de Sobrecarga de Paralelización (PO) . Dicho modelo debe calibrarse para cada nodo individualmente (dependencias de hardware, SO). Se deja abierta la cantidad de formas de sobrecarga representadas en un OM y, por lo tanto, pueden existir múltiples OM con diversos grados de complejidad. El nivel de precisión que necesita el OM implementado está determinado por el peso total de PO para el cálculo específico. Las tareas más cortas conducen a un mayor peso de PO , lo que a su vez requiere un OM más precisosi estuviéramos intentando predecir las Eficiencias de Paralelización (PE) .


6.2 Programación paralela (PS)

El programa paralelo es una representación bidimensional del cálculo paralelo, donde el eje x representa el tiempo y el eje y representa un grupo de trabajadores paralelos. El número de trabajadores y el tiempo total de cálculo marcan la extensión de un rectángulo, en el que se dibujan rectángulos más pequeños. Estos rectángulos más pequeños representan unidades atómicas de trabajo (taskels).

A continuación, encontrará la visualización de un PS extraído con datos del DM del algoritmo de tamaño de grupo para el escenario denso .

Figura 5

  • El eje x se divide en unidades de tiempo iguales, donde cada unidad representa el tiempo de cálculo que requiere un taskel.
  • El eje y se divide en el número de procesos de trabajo que utiliza el grupo.
  • Un taskel aquí se muestra como el rectángulo de color cian más pequeño, colocado en una línea de tiempo (un programa) de un proceso de trabajo anónimo.
  • Una tarea es una o varias tareas en una línea de tiempo de trabajador resaltadas continuamente con el mismo tono.
  • Las unidades de tiempo de inactividad se representan mediante mosaicos de color rojo.
  • El horario paralelo está dividido en secciones. La última sección es la sección de la cola.

Los nombres de las partes compuestas se pueden ver en la siguiente imagen.

figura 6

En un PM completo que incluye un OM , Idling Share no se limita a la cola, sino que también comprende el espacio entre tareas e incluso entre tareas.


6.3 Eficiencias

Los modelos presentados anteriormente permiten cuantificar la tasa de utilización de los trabajadores. Podemos distinguir:

  • Eficiencia de distribución (DE) : calculada con ayuda de un DM (o un método simplificado para escenario denso ).
  • Eficiencia de paralelización (PE) : calculada con la ayuda de un PM calibrado (predicción) o calculada a partir de metadatos de cálculos reales.

Es importante tener en cuenta que las eficiencias calculadas no se correlacionan automáticamente con un cálculo general más rápido para un problema de paralelización determinado. La utilización del trabajador en este contexto solo distingue entre un trabajador que tiene una tarea iniciada pero no terminada y un trabajador que no tiene una tarea tan "abierta". Eso significa que no se registra la posible inactividad durante el período de tiempo de un taskel .

Todas las eficiencias mencionadas anteriormente se obtienen básicamente mediante el cálculo del cociente de la división Ocupado Compartido / Horario Paralelo . La diferencia entre DE y PE viene con el Ocupado ocupando una porción más pequeña del Programa Paralelo general para el MP extendido. .

Esta respuesta solo discutirá un método simple para calcular DE para el escenario denso. Esto es lo suficientemente adecuado para comparar diferentes algoritmos de tamaño de trozo, ya que ...

  1. ... el DM es la parte del PM , que cambia con los diferentes algoritmos de tamaño de trozo empleados.
  2. ... el escenario denso con duraciones de cálculo iguales por grupo de tareas representa un "estado estable", por lo que estos períodos de tiempo se eliminan de la ecuación. Cualquier otro escenario conduciría a resultados aleatorios, ya que el orden de las tareas sería importante.

6.3.1 Eficiencia de distribución absoluta (ADE)

Esta eficiencia básica se puede calcular en general dividiendo la participación ocupada entre todo el potencial del horario paralelo :

Eficiencia de distribución absoluta (ADE) = Participación ocupada / Programa paralelo

Para el escenario denso , el código de cálculo simplificado se ve así:

# mp_utils.py

def calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
    """Calculate Absolute Distribution Efficiency (ADE).

    `len_iterable` is not used, but contained to keep a consistent signature
    with `calc_rde`.
    """
    if n_workers == 1:
        return 1

    potential = (
        ((n_chunks // n_workers + (n_chunks % n_workers > 1)) * chunksize)
        + (n_chunks % n_workers == 1) * last_chunk
    ) * n_workers

    n_full_chunks = n_chunks - (chunksize > last_chunk)
    taskels_in_regular_chunks = n_full_chunks * chunksize
    real = taskels_in_regular_chunks + (chunksize > last_chunk) * last_chunk
    ade = real / potential

    return ade

Si no hay cuota inactiva , la cuota ocupada será igual a la programación paralela , por lo que obtenemos un ADE del 100%. En nuestro modelo simplificado, este es un escenario en el que todos los procesos disponibles estarán ocupados durante todo el tiempo necesario para procesar todas las tareas. En otras palabras, todo el trabajo se paraleliza efectivamente al 100 por ciento.

Pero, ¿por qué sigo refiriéndome a la educación física como educación física absoluta aquí?

Para comprender eso, tenemos que considerar un caso posible para el tamaño de chunksize (cs) que garantiza la máxima flexibilidad de programación (también, el número de montañeses que puede haber. ¿Coincidencia?):

__________________________________ ~ UNO ~ __________________________________

Si, por ejemplo, tenemos cuatro procesos de trabajo y 37 tareas, habrá trabajadores inactivos incluso con chunksize=1, solo porque n_workers=4no es un divisor de 37. El resto de la división 37/4 es 1. Esta única tarea restante tendrá que ser procesado por un solo trabajador, mientras que los tres restantes están inactivos.

Del mismo modo, todavía habrá un trabajador inactivo con 39 tareas, como puede ver en la imagen de abajo.

figura 7

Cuando compare el Programa paralelo superior para chunksize=1con la versión siguiente para chunksize=3, notará que el Programa paralelo superior es más pequeño, la línea de tiempo en el eje x más corta. Debería ser obvio ahora, cómo los fragmentos más grandes de forma inesperada también pueden conducir a un aumento de los tiempos de cálculo generales, incluso para los escenarios densos .

Pero, ¿por qué no usar la longitud del eje x para los cálculos de eficiencia?

Porque los gastos generales no están contenidos en este modelo. Será diferente para ambos tamaños de trozos, por lo que el eje x no es realmente comparable directamente. La sobrecarga aún puede conducir a un tiempo total de cálculo más largo, como se muestra en el caso 2 de la figura siguiente.

figura 8


6.3.2 Eficiencia de distribución relativa (RDE)

El valor de ADE no contiene la información si es posible una mejor distribución de las tareas con chunksize establecido en 1. Mejor aquí todavía significa una participación inactiva más pequeña .

Para obtener un valor DE ajustado por la máxima DE posible , tenemos que dividir el ADE considerado entre el ADE que obtenemos chunksize=1.

Eficiencia de distribución relativa (RDE) = ADE_cs_x / ADE_cs_1

Así es como se ve esto en el código:

# mp_utils.py

def calc_rde(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
    """Calculate Relative Distribution Efficiency (RDE)."""
    ade_cs1 = calc_ade(
        n_workers, len_iterable, n_chunks=len_iterable,
        chunksize=1, last_chunk=1
    )
    ade = calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk)
    rde = ade / ade_cs1

    return rde

RDE , como se define aquí, en esencia es un cuento sobre la cola de un Programa Paralelo . RDE está influenciado por el tamaño de trozo máximo efectivo contenido en la cola. (Esta cola puede tener la longitud del eje x chunksizeo last_chunk). Esto tiene la consecuencia de que el RDE converge naturalmente al 100% (uniforme) para todo tipo de "apariencia de cola" como se muestra en la figura siguiente.

figura 9

Un RDE bajo ...

  • es un fuerte indicio de potencial de optimización.
  • naturalmente, se vuelve menos probable para iterables más largos, porque la porción de cola relativa de la programación paralela general se reduce.

Encuentre la Parte II de esta respuesta aquí .

Darkonaut
fuente
51
Una de las respuestas más épicas que he visto en SO.
Christian Long
4
Oh, esta fue tu respuesta corta: P
d_kennetz
1
Pero de verdad ... esta es una excelente respuesta. He destacado la pregunta para casos futuros en los que quiero entender esto mejor. ¡Hojearlo ya me enseñó mucho! Gracias
d_kennetz
2
@ L.Iridium ¡De nada! Hice uso matplotlib en lo posible y de lo contrario ... LibreOffice Calc + Pinta (edición básica de imágenes). Sí, lo sé ... pero funciona, de alguna manera. ;)
Darkonaut
2
Primera respuesta con una tabla de contenido visto en SO.
tly_alex
51

Acerca de esta respuesta

Esta respuesta es la Parte II de la respuesta aceptada anterior .


7. Algoritmo de tamaño ingenuo vs.

Antes de entrar en detalles, considere los dos gifs a continuación. Para un rango de diferentes iterablelongitudes, muestran cómo los dos algoritmos comparados fragmentan el pasado iterable(será una secuencia para entonces) y cómo se podrían distribuir las tareas resultantes. El orden de los trabajadores es aleatorio y el número de tareas distribuidas por trabajador en realidad puede diferir de estas imágenes para tareas ligeras o tareas en un escenario amplio. Como se mencionó anteriormente, los gastos generales tampoco se incluyen aquí. Sin embargo, para tareas lo suficientemente pesadas en un escenario denso con tamaños de datos transmitidos despreciables, los cálculos reales dibujan una imagen muy similar.

cs_4_50

cs_200_250

Como se muestra en el capítulo " 5. Chunksize-Algorithm de Pool", con el algoritmo de Chunksize de Pool, el número de fragmentos se estabilizará n_chunks == n_workers * 4para iterables suficientemente grandes, mientras sigue cambiando entre n_chunks == n_workersy n_chunks == n_workers + 1con el enfoque ingenuo. Para el algoritmo ingenuo se aplica: Porque n_chunks % n_workers == 1es Truepara n_chunks == n_workers + 1, se creará una nueva sección donde solo se empleará un solo trabajador.

Algoritmo ingenuo de tamaño de trozos:

Puede pensar que creó tareas en la misma cantidad de trabajadores, pero esto solo será cierto para los casos en los que no haya resto para len_iterable / n_workers. Si queda un resto, habrá una nueva sección con una sola tarea para un solo trabajador. En ese punto, su cálculo ya no será paralelo.

A continuación, verá una figura similar a la que se muestra en el capítulo 5, pero que muestra el número de secciones en lugar del número de fragmentos. Para el algoritmo de tamaño completo de Pool ( n_pool2), n_sectionsse estabilizará en el infame factor codificado 4. Para el algoritmo ingenuo, n_sectionsse alternará entre uno y dos.

figura 10

Para el algoritmo de chunksize de Pool, la estabilización a n_chunks = n_workers * 4través del tratamiento adicional mencionado anteriormente , evita la creación de una nueva sección aquí y mantiene la cuota inactiva limitada a un trabajador durante iterables lo suficientemente largos. No solo eso, sino que el algoritmo seguirá reduciendo el tamaño relativo de la cuota inactiva , lo que conduce a un valor de RDE que converge hacia el 100%.

"Lo suficientemente largo" para n_workers=4es, len_iterable=210por ejemplo. Para iterables iguales o más grandes que eso, la participación inactiva se limitará a un trabajador, un rasgo que originalmente se perdió debido a la 4-multiplicación dentro del algoritmo chunksize-en primer lugar.

figura 11

El ingenuo algoritmo chunksize también converge hacia el 100%, pero lo hace más lento. El efecto de convergencia depende únicamente del hecho de que la parte relativa de la cola se contrae en los casos en que habrá dos secciones. Esta cola con un solo trabajador empleado se limita a la longitud del eje x n_workers - 1, el resto máximo posible para len_iterable / n_workers.

¿En qué se diferencian los valores reales de RDE para el algoritmo de tamaño de trozo de Pool y el ingenuo?

A continuación, encontrará dos mapas de calor que muestran los valores de RDE para todas las longitudes iterables hasta 5000, para todos los números de trabajadores desde 2 hasta 100. La escala de colores va de 0,5 a 1 (50% -100%). Notará áreas mucho más oscuras (valores RDE más bajos) para el algoritmo ingenuo en el mapa de calor de la izquierda. Por el contrario, el algoritmo de tamaño de trozo de Pool a la derecha dibuja una imagen mucho más brillante.

figura 12

El gradiente diagonal de las esquinas oscuras de la parte inferior izquierda frente a las esquinas brillantes de la parte superior derecha muestra nuevamente la dependencia del número de trabajadores para lo que se puede llamar un "iterable largo".

¿Qué tan malo puede llegar a ser con cada algoritmo?

Con el algoritmo de tamaño de trozo de Pool, un valor de RDE de 81,25% es el valor más bajo para el rango de trabajadores y longitudes iterables especificadas anteriormente:

figura 13

Con el ingenuo algoritmo chunksize, las cosas pueden empeorar mucho. El RDE calculado más bajo aquí es 50,72%. En este caso, ¡casi la mitad del tiempo de cálculo solo se ejecuta un trabajador! Así que, cuidado, orgullosos propietarios de Knights Landing . ;)

figura 14


8. Verificación de la realidad

En los capítulos anteriores consideramos un modelo simplificado para el problema de distribución puramente matemático, despojado de los detalles esenciales que hacen del multiprocesamiento un tema tan espinoso en primer lugar. Para comprender mejor hasta qué punto el modelo de distribución (DM) por sí solo puede contribuir a explicar la utilización observada de los trabajadores en la realidad, ahora veremos algunas programaciones paralelas dibujadas mediante cálculos reales .

Preparar

Las siguientes gráficas tratan todas con ejecuciones paralelas de una función ficticia simple, unida a la CPU, que se llama con varios argumentos para que podamos observar cómo la Programación paralela dibujada varía en dependencia de los valores de entrada. El "trabajo" dentro de esta función consiste solo en la iteración sobre un objeto de rango. Esto ya es suficiente para mantener un núcleo ocupado, ya que pasamos grandes números. Opcionalmente, la función toma un extra exclusivo de la tarea dataque simplemente se devuelve sin cambios. Dado que cada taskel comprende exactamente la misma cantidad de trabajo, todavía estamos tratando con un escenario denso aquí.

La función está decorada con un contenedor que toma marcas de tiempo con resolución ns (Python 3.7+). Las marcas de tiempo se utilizan para calcular el intervalo de tiempo de un taskel y, por lo tanto, permiten dibujar un horario paralelo empírico.

@stamp_taskel
def busy_foo(i, it, data=None):
    """Dummy function for CPU-bound work."""
    for _ in range(int(it)):
        pass
    return i, data


def stamp_taskel(func):
    """Decorator for taking timestamps on start and end of decorated
    function execution.
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time_ns()
        result = func(*args, **kwargs)
        end_time = time_ns()
        return (current_process().name, (start_time, end_time)), result
    return wrapper

El método de mapa de estrellas de Pool también está decorado de tal manera que solo se cronometra la llamada de mapa de estrellas. El "inicio" y el "final" de esta llamada determinan el mínimo y el máximo en el eje x del programa paralelo producido.

Vamos a observar el cálculo de 40 tareas en cuatro procesos de trabajo en una máquina con estas especificaciones: Python 3.7.1, Ubuntu 18.04.2, CPU Intel® Core ™ i7-2600K @ 3.40GHz × 8

Los valores de entrada que se variarán son el número de iteraciones en el bucle for (30k, 30M, 600M) y el tamaño de los datos de envío adicional (por taskel, numpy-ndarray: 0 MiB, 50 MiB).

...
N_WORKERS = 4
LEN_ITERABLE = 40
ITERATIONS = 30e3  # 30e6, 600e6
DATA_MiB = 0  # 50

iterable = [
    # extra created data per taskel
    (i, ITERATIONS, np.arange(int(DATA_MiB * 2**20 / 8)))  # taskel args
    for i in range(LEN_ITERABLE)
]


with Pool(N_WORKERS) as pool:
    results = pool.starmap(busy_foo, iterable)

Las ejecuciones que se muestran a continuación se seleccionaron a dedo para tener el mismo orden de fragmentos, de modo que pueda detectar mejor las diferencias en comparación con la programación paralela del modelo de distribución, pero no olvide que el orden en el que los trabajadores realizan su tarea no es determinista.

Predicción DM

Para reiterar, el modelo de distribución "predice" una programación paralela como ya lo hemos visto antes en el capítulo 6.2:

figura 15

1a EJECUCIÓN: 30k iteraciones y 0 datos MiB por tarea

figura 16

Nuestra primera ejecución aquí es muy corta, las tareas son muy "ligeras". La pool.starmap()llamada completa solo tomó 14,5 ms en total. Notarás que, al contrario que con el DM , el ralentí no se limita a la sección de cola, sino que también tiene lugar entre tareas e incluso entre tareas. Eso es porque nuestro horario real aquí incluye naturalmente todo tipo de gastos generales. Ralentí aquí significa todo lo que está fuera de un taskel. Posible verdadera ralentí durante un taskel no se captura la forma ya mencionada antes.

Además, puede ver que no todos los trabajadores realizan sus tareas al mismo tiempo. Eso se debe al hecho de que todos los trabajadores se alimentan de un elemento compartido inqueuey solo un trabajador puede leerlo a la vez. Lo mismo se aplica a outqueue. Esto puede causar mayores problemas tan pronto como esté transmitiendo tamaños de datos no marginales, como veremos más adelante.

Además, puede ver que, a pesar de que cada taskel comprende la misma cantidad de trabajo, el intervalo de tiempo real medido para un taskel varía mucho. Las tareas distribuidas a worker-3 y worker-4 necesitan más tiempo que las procesadas por los dos primeros trabajadores. Para esta ejecución, sospecho que se debe a que el turbo boost ya no está disponible en los núcleos para worker-3/4 en ese momento, por lo que procesaron sus tareas con una frecuencia de reloj más baja.

Todo el cálculo es tan ligero que los factores de caos introducidos por el hardware o el sistema operativo pueden sesgar el PS drásticamente. El cálculo es una "hoja en el viento" y la predicción DM tiene poca importancia, incluso para un escenario teóricamente adecuado.

2da EJECUCIÓN: 30 millones de iteraciones y 0 datos MiB por tarea

figura 17

Aumentar el número de iteraciones en el bucle for de 30.000 a 30 millones da como resultado un Programa Paralelo real que se acerca a una coincidencia perfecta con el predicho por los datos proporcionados por el DM , ¡hurra! El cálculo por tarea ahora es lo suficientemente pesado como para marginar las partes inactivas al principio y en el medio, dejando solo visible la gran parte inactiva que predijo el DM .

3a EJECUCIÓN: 30 millones de iteraciones y 50 MiB de datos por tarea

figura 18

Mantener las iteraciones de 30M, pero además enviar 50 MiB por taskel hacia adelante y hacia atrás sesga la imagen nuevamente. Aquí el efecto de cola es bien visible. Worker-4 necesita esperar más tiempo para su segunda tarea que Worker-1. ¡Ahora imagina este horario con 70 trabajadores!

En caso de que las tareas sean muy ligeras desde el punto de vista computacional, pero ofrecen una cantidad notable de datos como carga útil, el cuello de botella de una sola cola compartida puede evitar cualquier beneficio adicional de agregar más trabajadores al grupo, incluso si están respaldados por núcleos físicos. En tal caso, Worker-1 podría terminar con su primera tarea y esperar una nueva incluso antes de que Worker-40 haya obtenido su primera tarea.

Ahora debería resultar obvio por qué los tiempos de cálculo en a Poolno siempre disminuyen linealmente con el número de trabajadores. El envío de cantidades relativamente grandes de datos puede conducir a escenarios en los que la mayor parte del tiempo se dedica a esperar a que los datos se copien en el espacio de direcciones de un trabajador y solo se puede alimentar a un trabajador a la vez.

4ta EJECUCIÓN: 600 millones de iteraciones y 50 MiB de datos por tarea

figura 19

Aquí enviamos 50 MiB de nuevo, pero aumentamos el número de iteraciones de 30 M a 600 M, lo que aumenta el tiempo total de cálculo de 10 sa 152 s. El horario paralelo dibujado de nuevo , está cerca de una coincidencia perfecta con el predicho, la sobrecarga a través de la copia de datos está marginada.


9. Conclusión

La multiplicación discutida por 4aumenta la flexibilidad de programación, pero también aprovecha la desigualdad en las distribuciones de tareas. Sin esta multiplicación, la cuota inactiva se limitaría a un solo trabajador incluso para iterables cortos (para DM con escenario denso). El algoritmo de chunksize de Pool necesita iterables de entrada para ser de cierto tamaño para recuperar ese rasgo.

Como se espera que esta respuesta haya demostrado, el algoritmo de tamaño de trozo de Pool conduce a una mejor utilización del núcleo en promedio en comparación con el enfoque ingenuo, al menos para el caso promedio y siempre que no se considere la sobrecarga. El algoritmo ingenuo aquí puede tener una eficiencia de distribución (DE) tan baja como ~ 51%, mientras que el algoritmo de tamaño de trozo de Pool tiene un mínimo de ~ 81%. Sin embargo, DE no incluye gastos generales de paralelización (PO) como IPC. El capítulo 8 ha demostrado que la DE todavía puede tener un gran poder de predicción para el escenario denso con gastos generales marginalizados.

A pesar del hecho de que el algoritmo de tamaño de trozo de Pool logra una DE más alta en comparación con el enfoque ingenuo, no proporciona distribuciones de tareas óptimas para cada constelación de entrada. Si bien un algoritmo de fragmentación estático simple no puede optimizar (incluida la sobrecarga) la Eficiencia de paralelización (PE), no hay una razón inherente por la que no siempre pueda proporcionar una Eficiencia de distribución relativa (RDE) del 100%, es decir, la misma DE que con chunksize=1. Un simple algoritmo de tamaño de trozos consiste solo en matemáticas básicas y es libre de "cortar el pastel" de cualquier manera.

A diferencia de la implementación de Pool de un algoritmo de "fragmentación de igual tamaño", un algoritmo de "fragmentación de tamaño uniforme" proporcionaría un RDE del 100% para todos los len_iterable/ n_workerscombinación. Un algoritmo de fragmentación de tamaño uniforme sería un poco más complicado de implementar en la fuente de Pool, pero se puede modular sobre el algoritmo existente simplemente empaquetando las tareas externamente (vincularé desde aquí en caso de que deje caer una Q / A en como hacer eso).

Darkonaut
fuente
6

Creo que parte de lo que se está perdiendo es que su estimación ingenua asume que cada unidad de trabajo requiere la misma cantidad de tiempo, en cuyo caso su estrategia sería la mejor. Pero si algunos trabajos terminan antes que otros, algunos núcleos pueden quedar inactivos esperando a que terminen los trabajos lentos.

Por lo tanto, al dividir los fragmentos en 4 veces más, si un fragmento termina antes, ese núcleo puede iniciar el siguiente fragmento (mientras que los otros núcleos siguen trabajando en su fragmento más lento).

No sé por qué eligieron el factor 4 exactamente, pero sería una compensación entre minimizar la sobrecarga del código del mapa (que quiere los fragmentos más grandes posibles) y equilibrar los fragmentos tomando diferentes cantidades de veces (lo que quiere el fragmento más pequeño posible) ).

Robar
fuente