¿Qué factores determinan un chunksize
argumento óptimo para métodos como multiprocessing.Pool.map()
? El .map()
método parece utilizar una heurística arbitraria para su tamaño de trozo predeterminado (explicado a continuación); ¿Qué motiva esa elección y hay un enfoque más reflexivo basado en alguna situación o configuración particular?
Ejemplo: digamos que soy:
- Pasar un
iterable
a.map()
que tiene ~ 15 millones de elementos; - Trabajando en una máquina con 24 núcleos y usando el predeterminado
processes = os.cpu_count()
dentromultiprocessing.Pool()
.
Mi pensamiento ingenuo es dar a cada uno de los 24 trabajadores una porción del mismo tamaño, es decir, 15_000_000 / 24
625.000. Los trozos grandes deben reducir la rotación / gastos generales al tiempo que se utiliza completamente a todos los trabajadores. Pero parece que a esto le faltan algunas desventajas potenciales de dar lotes grandes a cada trabajador. ¿Es esta una imagen incompleta y qué me estoy perdiendo?
Parte de mi pregunta proviene de la lógica predeterminada para if chunksize=None
: both .map()
y .starmap()
call .map_async()
, que se ve así:
def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
error_callback=None):
# ... (materialize `iterable` to list if it's an iterator)
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4) # ????
if extra:
chunksize += 1
if len(iterable) == 0:
chunksize = 0
¿Cuál es la lógica detrás divmod(len(iterable), len(self._pool) * 4)
? Esto implica que el tamaño de trozo estará más cerca de 15_000_000 / (24 * 4) == 156_250
. ¿Cuál es la intención de multiplicar len(self._pool)
por 4?
Esto hace que el tamaño del fragmento resultante sea un factor de 4 más pequeño que mi "lógica ingenua" de arriba, que consiste en dividir la longitud del iterable por el número de trabajadores en pool._pool
.
Por último, también hay este fragmento de los documentos de Python .imap()
que impulsa aún más mi curiosidad:
El
chunksize
argumento es el mismo que el utilizado por elmap()
método. Para iterables muy largos, el uso de un valor grande parachunksize
puede hacer que el trabajo se complete mucho más rápido que el uso del valor predeterminado de 1.
Respuesta relacionada que es útil pero un poco de alto nivel: multiprocesamiento de Python: ¿por qué los trozos grandes son más lentos? .
4
Es arbitrario y todo el cálculo de chunksize es heurístico. El factor relevante es cuánto puede variar su tiempo de procesamiento real. Un poco más sobre esto aquí hasta que tenga tiempo para una respuesta si todavía es necesario.Respuestas:
Respuesta corta
El algoritmo de chunksize de Pool es heurístico. Proporciona una solución sencilla para todos los escenarios de problemas imaginables que está intentando introducir en los métodos de Pool. Como consecuencia, no se puede optimizar para ningún escenario específico .
El algoritmo divide arbitrariamente el iterable en aproximadamente cuatro veces más partes que el enfoque ingenuo. Más fragmentos significan más gastos generales, pero una mayor flexibilidad de programación. Cómo se mostrará esta respuesta, esto conduce a una mayor utilización de los trabajadores en promedio, pero sin la garantía de un tiempo de cálculo general más corto para cada caso.
"Es bueno saberlo", podría pensar, "pero ¿cómo me ayuda saber esto con mis problemas concretos de multiprocesamiento?" Bueno, no es así. La respuesta corta más honesta es "no hay una respuesta corta", "el multiprocesamiento es complejo" y "depende". Un síntoma observado puede tener diferentes raíces, incluso para escenarios similares.
Esta respuesta intenta brindarle conceptos básicos que lo ayudarán a obtener una imagen más clara de la caja negra de programación de Pool. También intenta brindarle algunas herramientas básicas a mano para reconocer y evitar posibles acantilados en la medida en que estén relacionados con el tamaño del trozo.
Tabla de contenido
Parte I
Cuantificación de la eficiencia del algoritmo
6.1 Modelos
6.2 Programación paralela
6.3 Eficiencias
6.3.1 Eficiencia de distribución absoluta (ADE)
6.3.2 Eficiencia de distribución relativa (RDE)
Parte II
Primero es necesario aclarar algunos términos importantes.
1. Definiciones
Pedazo
Un fragmento aquí es una parte del
iterable
argumento especificado en una llamada al método pool. Cómo se calcula el tamaño del trozo y qué efectos puede tener, es el tema de esta respuesta.Tarea
La representación física de una tarea en un proceso de trabajo en términos de datos se puede ver en la siguiente figura.
La figura muestra una llamada de ejemplo a
pool.map()
, mostrada a lo largo de una línea de código, tomada de lamultiprocessing.pool.worker
función, dondeinqueue
se desempaqueta una tarea leída de .worker
es la función principal subyacente enMainThread
un proceso de trabajo de grupo. Elfunc
-argumento especificado en el método pool solo coincidirá con lafunc
-variable dentro de la funciónworker
-para métodos de llamada única comoapply_async
y paraimap
conchunksize=1
. Para el resto de los métodos de grupo con unchunksize
parámetro, la función de procesamientofunc
será una función de mapeador (mapstar
ostarmapstar
). Esta función asigna elfunc
parámetro especificado por el usuario en cada elemento del fragmento transmitido del iterable (-> "tareas de mapa"). El tiempo que toma, define una tareatambién como unidad de trabajo .Taskel
Si bien el uso de la palabra "tarea" para todo el procesamiento de un fragmento se corresponde con el código interno
multiprocessing.pool
, no hay ninguna indicación de cómo una sola llamada al usuario especificadofunc
, con un elemento del fragmento como argumento (s), debe ser referido a. Para evitar la confusión que surge de los conflictos de nombres (piense en el parámetro -paramaxtasksperchild
el__init__
método de Pool ), esta respuesta se referirá a las unidades individuales de trabajo dentro de una tarea como taskel .Sobrecarga de paralelización (PO)
PO consta de gastos generales internos de Python y gastos generales para la comunicación entre procesos (IPC). La sobrecarga por tarea dentro de Python viene con el código necesario para empaquetar y desempaquetar las tareas y sus resultados. IPC-overhead viene con la necesaria sincronización de hilos y la copia de datos entre diferentes espacios de direcciones (se necesitan dos pasos de copia: padre -> cola -> hijo). La cantidad de sobrecarga de IPC depende del tamaño de los datos, el hardware y el sistema operativo, lo que dificulta las generalizaciones sobre el impacto.
2. Objetivos de paralelización
Cuando usamos multiprocesamiento, nuestro objetivo general (obviamente) es minimizar el tiempo total de procesamiento para todas las tareas. Para alcanzar este objetivo general, nuestro objetivo técnico debe ser optimizar la utilización de los recursos de hardware .
Algunos subobjetivos importantes para lograr el objetivo técnico son:
Al principio, las tareas deben ser lo suficientemente pesadas (intensivas) computacionalmente, para recuperar el PO que tenemos que pagar por la paralelización. La relevancia de PO disminuye al aumentar el tiempo de cálculo absoluto por tarea. O, para decirlo al revés, cuanto mayor sea el tiempo de cálculo absoluto por tarea para su problema, menos relevante será la necesidad de reducir el PO. Si su cálculo tomará horas por tarea, la sobrecarga de IPC será insignificante en comparación. La principal preocupación aquí es evitar que los procesos de trabajo inactivos después de que se hayan distribuido todas las tareas. Mantener todos los núcleos cargados significa que estamos paralelizando tanto como sea posible.
3. Escenarios de paralelización
El factor principal en cuestión es cuánto tiempo de cálculo puede variar entre nuestras tareas individuales. Para nombrarlo, la elección de un tamaño de trozo óptimo está determinada por el coeficiente de variación ( CV ) para los tiempos de cálculo por tarea.
Los dos escenarios extremos en una escala, según el alcance de esta variación son:
Para una mejor memorización, me referiré a estos escenarios como:
Escenario denso
En un escenario denso , sería deseable distribuir todas las tareas a la vez, para mantener la IPC necesaria y el cambio de contexto al mínimo. Esto significa que queremos crear solo la cantidad de fragmentos, la cantidad de procesos de trabajo que haya. Como ya se dijo anteriormente, el peso de PO aumenta con tiempos de cálculo más cortos por tarea.
Para un rendimiento máximo, también queremos que todos los procesos de trabajo estén ocupados hasta que se procesen todas las tareas (sin trabajadores inactivos). Para este objetivo, los fragmentos distribuidos deben ser del mismo tamaño o cerca de.
Escenario amplio
El mejor ejemplo de un escenario amplio sería un problema de optimización, donde los resultados convergen rápidamente o el cálculo puede llevar horas, si no días. Por lo general, no es predecible qué combinación de "tareas ligeras" y "tareas pesadas" contendrá una tarea en tal caso, por lo que no es aconsejable distribuir demasiadas tareas en un lote de tareas a la vez. Distribuir menos tareas a la vez de lo posible significa aumentar la flexibilidad de programación. Esto es necesario aquí para alcanzar nuestro subobjetivo de una alta utilización de todos los núcleos.
Si los
Pool
métodos, por defecto, estuvieran totalmente optimizados para el escenario denso, crearían cada vez más tiempos subóptimos para cada problema ubicado más cerca del escenario amplio.4. Riesgos de Chunksize> 1
Considere este ejemplo simplificado de pseudocódigo de un Escenario Amplio -iterable, que queremos pasar a un método de grupo:
good_luck_iterable = [60, 60, 86400, 60, 86400, 60, 60, 84600]
En lugar de los valores reales, pretendemos ver el tiempo de cálculo necesario en segundos, por simplicidad solo 1 minuto o 1 día. Suponemos que el grupo tiene cuatro procesos de trabajo (en cuatro núcleos) y
chunksize
está configurado en2
. Debido a que se mantendrá el orden, los fragmentos enviados a los trabajadores serán los siguientes:[(60, 60), (86400, 60), (86400, 60), (60, 84600)]
Como tenemos suficientes trabajadores y el tiempo de cálculo es lo suficientemente alto, podemos decir que cada proceso de trabajo tendrá una parte en la que trabajar en primer lugar. (Este no tiene por qué ser el caso para completar tareas rápidamente). Además, podemos decir que todo el procesamiento tomará aproximadamente 86400 + 60 segundos, porque ese es el tiempo de cálculo total más alto para un fragmento en este escenario artificial y distribuimos fragmentos solo una vez.
Ahora considere este iterable, que tiene solo un elemento cambiando su posición en comparación con el iterable anterior:
bad_luck_iterable = [60, 60, 86400, 86400, 60, 60, 60, 84600]
... y los trozos correspondientes:
[(60, 60), (86400, 86400), (60, 60), (60, 84600)]
¡Solo mala suerte con la clasificación de nuestro iterable casi duplicó (86400 + 86400) nuestro tiempo total de procesamiento! El trabajador que recibe el fragmento vicioso (86400, 86400) está bloqueando el segundo taskel pesado en su tarea para que no se distribuya a uno de los trabajadores inactivos que ya terminaron con sus (60, 60) fragmentos. Obviamente, no nos arriesgaríamos a un resultado tan desagradable si nos dispusiéramos
chunksize=1
.Este es el riesgo de trozos más grandes. Con tamaños más altos, cambiamos la flexibilidad de programación por menos gastos generales y, en casos como el anterior, eso es un mal negocio.
Cómo lo veremos en el capítulo 6. Cuantificación de la eficiencia del algoritmo , los fragmentos más grandes también pueden conducir a resultados subóptimos para los escenarios densos .
5. Algoritmo de trozos de pool
A continuación, encontrará una versión ligeramente modificada del algoritmo dentro del código fuente. Como puede ver, corté la parte inferior y la envolví en una función para calcular el
chunksize
argumento externamente. También reemplacé4
con unfactor
parámetro y subcontraté laslen()
llamadas.# mp_utils.py def calc_chunksize(n_workers, len_iterable, factor=4): """Calculate chunksize argument for Pool-methods. Resembles source-code within `multiprocessing.pool.Pool._map_async`. """ chunksize, extra = divmod(len_iterable, n_workers * factor) if extra: chunksize += 1 return chunksize
Para asegurarnos de que todos estamos en la misma página, esto es lo que
divmod
hace:divmod(x, y)
es una función incorporada que regresa(x//y, x%y)
.x // y
es la división de piso, que devuelve el cociente redondeado hacia abajo dex / y
, mientras quex % y
es la operación de módulo que devuelve el resto dex / y
. De ahí, por ejemplo,divmod(10, 3)
devoluciones(3, 1)
.Ahora, cuando nos fijamos en
chunksize, extra = divmod(len_iterable, n_workers * 4)
, se dará cuenta den_workers
que aquí es el divisory
enx / y
y por la multiplicación4
, sin un ajuste adicional a través deif extra: chunksize +=1
más adelante, conduce a una chunksize inicial de al menos cuatro veces más pequeño (porlen_iterable >= n_workers * 4
) de lo que sería de otra manera.Para ver el efecto de la multiplicación por
4
en el resultado de tamaño de fragmento intermedio, considere esta función:def compare_chunksizes(len_iterable, n_workers=4): """Calculate naive chunksize, Pool's stage-1 chunksize and the chunksize for Pool's complete algorithm. Return chunksizes and the real factors by which naive chunksizes are bigger. """ cs_naive = len_iterable // n_workers or 1 # naive approach cs_pool1 = len_iterable // (n_workers * 4) or 1 # incomplete pool algo. cs_pool2 = calc_chunksize(n_workers, len_iterable) real_factor_pool1 = cs_naive / cs_pool1 real_factor_pool2 = cs_naive / cs_pool2 return cs_naive, cs_pool1, cs_pool2, real_factor_pool1, real_factor_pool2
La función anterior calcula el tamaño de trozo ingenuo (
cs_naive
) y el tamaño de trozo del primer paso del algoritmo de tamaño de trozo (cs_pool1
) de Pool , así como el tamaño de trozo del algoritmo de grupo completo (cs_pool2
). Además, calcula los factores realesrf_pool1 = cs_naive / cs_pool1
yrf_pool2 = cs_naive / cs_pool2
, que nos dicen cuántas veces los tamaños de trozos calculados ingenuamente son más grandes que las versiones internas de Pool.A continuación, verá dos figuras creadas con la salida de esta función. La figura de la izquierda solo muestra los tamaños de trozos
n_workers=4
hasta una longitud iterable de500
. La figura de la derecha muestra los valores derf_pool1
. Para longitudes iterables16
, el factor real se convierte en>=4
(paralen_iterable >= n_workers * 4
) y su valor máximo es7
para longitudes iterables28-31
. Esa es una desviación masiva del factor original4
al que converge el algoritmo para iterables más largos. "Más tiempo" aquí es relativo y depende del número de trabajadores especificados.Recuerde que chunksize
cs_pool1
todavía carece delextra
ajuste -con el resto deldivmod
contenidocs_pool2
del algoritmo completo.El algoritmo continúa con:
if extra: chunksize += 1
Ahora bien, en los casos estaban allí es un resto (una
extra
de la DIVMOD-operación), el aumento de la chunksize por 1, obviamente, no puede trabajar para cada tarea. Después de todo, si así fuera, para empezar, no quedaría un resto.Como se puede ver en las figuras siguientes, el " extra-tratamiento " tiene el efecto, que el verdadero factor de
rf_pool2
ahora converge hacia4
desde abajo4
y la desviación es un poco más suave. Desviación estándar paran_workers=4
ylen_iterable=500
cae de0.5233
pararf_pool1
a0.4115
pararf_pool2
.Eventualmente, aumentar
chunksize
en 1 tiene el efecto de que la última tarea transmitida solo tenga un tamaño delen_iterable % chunksize or chunksize
.Sin embargo , el efecto más interesante y como veremos más adelante, más consecuente, del tratamiento adicional se puede observar para la cantidad de fragmentos generados (
n_chunks
). Para iterables lo suficientemente largos, el algoritmo de tamaño de trozos completo de Pool (n_pool2
en la figura siguiente) estabilizará el número de trozos enn_chunks == n_workers * 4
. Por el contrario, el algoritmo ingenuo (después de un eructo inicial) sigue alternando entren_chunks == n_workers
y an_chunks == n_workers + 1
medida que crece la duración del iterable.A continuación, encontrará dos funciones de información mejoradas para Pool y el ingenuo algoritmo chunksize. La salida de estas funciones será necesaria en el próximo capítulo.
# mp_utils.py from collections import namedtuple Chunkinfo = namedtuple( 'Chunkinfo', ['n_workers', 'len_iterable', 'n_chunks', 'chunksize', 'last_chunk'] ) def calc_chunksize_info(n_workers, len_iterable, factor=4): """Calculate chunksize numbers.""" chunksize, extra = divmod(len_iterable, n_workers * factor) if extra: chunksize += 1 # `+ (len_iterable % chunksize > 0)` exploits that `True == 1` n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0) # exploit `0 == False` last_chunk = len_iterable % chunksize or chunksize return Chunkinfo( n_workers, len_iterable, n_chunks, chunksize, last_chunk )
No se confunda por el aspecto probablemente inesperado de
calc_naive_chunksize_info
. Elextra
desdedivmod
no se utiliza para calcular el tamaño del fragmento.def calc_naive_chunksize_info(n_workers, len_iterable): """Calculate naive chunksize numbers.""" chunksize, extra = divmod(len_iterable, n_workers) if chunksize == 0: chunksize = 1 n_chunks = extra last_chunk = chunksize else: n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0) last_chunk = len_iterable % chunksize or chunksize return Chunkinfo( n_workers, len_iterable, n_chunks, chunksize, last_chunk )
6. Cuantificación de la eficiencia del algoritmo
Ahora, después de haber visto cómo la salida del
Pool
algoritmo de tamaño de chunksize se ve diferente en comparación con la salida del algoritmo ingenuo ...Como se muestra en el capítulo anterior, para iterables más largos (una mayor cantidad de tareas), el algoritmo de tamaño de trozos de Pool divide aproximadamente el iterable en cuatro veces más partes que el método ingenuo. Los fragmentos más pequeños significan más tareas y más tareas significan más gastos indirectos de paralelización (PO) , un costo que debe sopesarse con el beneficio de una mayor flexibilidad de programación (recuerde "Riesgos de tamaño de fragmento> 1" ).
Por razones bastante obvias, el algoritmo de chunksize básico de Pool no puede sopesar la flexibilidad de programación con la PO para nosotros. La sobrecarga de IPC depende del tamaño de los datos, el hardware y el sistema operativo. El algoritmo no puede saber en qué hardware ejecutamos nuestro código, ni tiene idea de cuánto tardará una tarea en completarse. Es una heurística que proporciona una funcionalidad básica para todos los escenarios posibles. Esto significa que no se puede optimizar para ningún escenario en particular. Como se mencionó anteriormente, PO también se vuelve cada vez menos preocupante con el aumento de los tiempos de cálculo por tarea (correlación negativa).
Cuando recuerde los Objetivos de Paralelización del capítulo 2, un punto fue:
El anteriormente mencionado algo , chunksize-algoritmo de piscina puede tratar de mejorar es la minimización de ralentí trabajadores-procesos , respectivamente, la utilización de la CPU-cores .
Las
multiprocessing.Pool
personas que se preguntan acerca de los núcleos no utilizados / procesos de trabajo inactivos hacen una pregunta repetida sobre SO con respecto a situaciones en las que se esperaría que todos los procesos de trabajo estén ocupados. Si bien esto puede tener muchas razones, los procesos de trabajo inactivos hacia el final de un cálculo son una observación que a menudo podemos hacer, incluso con Escenarios densos (tiempos de cálculo iguales por tarea) en los casos en que el número de trabajadores no es un divisor del número. de trozos (n_chunks % n_workers > 0
).La pregunta ahora es:
6.1 Modelos
Para obtener información más profunda aquí, necesitamos una forma de abstracción de cálculos paralelos que simplifique la realidad demasiado compleja hasta un grado manejable de complejidad, al tiempo que conserva la importancia dentro de los límites definidos. Tal abstracción se llama modelo . Una implementación de tal " Modelo de Paralelización" (PM) genera metadatos mapeados por el trabajador (marcas de tiempo) como lo harían los cálculos reales, si los datos fueran recopilados. Los metadatos generados por el modelo permiten predecir métricas de cálculos paralelos bajo ciertas restricciones.
Uno de los dos submodelos dentro del PM definido aquí es el Modelo de Distribución (DM) . El DM explica cómo las unidades atómicas de trabajo (taskels) se distribuyen entre los trabajadores paralelos y el tiempo , cuando no se consideran otros factores que el algoritmo chunksize-size respectivo, el número de trabajadores, el input-iterable (número de taskels) y su duración de cálculo. . Esto significa que no se incluye ningún tipo de gastos generales .
Para obtener un MP completo , el DM se amplía con un Modelo de Sobrecarga (OM) , que representa varias formas de Sobrecarga de Paralelización (PO) . Dicho modelo debe calibrarse para cada nodo individualmente (dependencias de hardware, SO). Se deja abierta la cantidad de formas de sobrecarga representadas en un OM y, por lo tanto, pueden existir múltiples OM con diversos grados de complejidad. El nivel de precisión que necesita el OM implementado está determinado por el peso total de PO para el cálculo específico. Las tareas más cortas conducen a un mayor peso de PO , lo que a su vez requiere un OM más precisosi estuviéramos intentando predecir las Eficiencias de Paralelización (PE) .
6.2 Programación paralela (PS)
El programa paralelo es una representación bidimensional del cálculo paralelo, donde el eje x representa el tiempo y el eje y representa un grupo de trabajadores paralelos. El número de trabajadores y el tiempo total de cálculo marcan la extensión de un rectángulo, en el que se dibujan rectángulos más pequeños. Estos rectángulos más pequeños representan unidades atómicas de trabajo (taskels).
A continuación, encontrará la visualización de un PS extraído con datos del DM del algoritmo de tamaño de grupo para el escenario denso .
Los nombres de las partes compuestas se pueden ver en la siguiente imagen.
En un PM completo que incluye un OM , Idling Share no se limita a la cola, sino que también comprende el espacio entre tareas e incluso entre tareas.
6.3 Eficiencias
Los modelos presentados anteriormente permiten cuantificar la tasa de utilización de los trabajadores. Podemos distinguir:
Es importante tener en cuenta que las eficiencias calculadas no se correlacionan automáticamente con un cálculo general más rápido para un problema de paralelización determinado. La utilización del trabajador en este contexto solo distingue entre un trabajador que tiene una tarea iniciada pero no terminada y un trabajador que no tiene una tarea tan "abierta". Eso significa que no se registra la posible inactividad durante el período de tiempo de un taskel .
Todas las eficiencias mencionadas anteriormente se obtienen básicamente mediante el cálculo del cociente de la división Ocupado Compartido / Horario Paralelo . La diferencia entre DE y PE viene con el Ocupado ocupando una porción más pequeña del Programa Paralelo general para el MP extendido. .
Esta respuesta solo discutirá un método simple para calcular DE para el escenario denso. Esto es lo suficientemente adecuado para comparar diferentes algoritmos de tamaño de trozo, ya que ...
6.3.1 Eficiencia de distribución absoluta (ADE)
Esta eficiencia básica se puede calcular en general dividiendo la participación ocupada entre todo el potencial del horario paralelo :
Para el escenario denso , el código de cálculo simplificado se ve así:
# mp_utils.py def calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk): """Calculate Absolute Distribution Efficiency (ADE). `len_iterable` is not used, but contained to keep a consistent signature with `calc_rde`. """ if n_workers == 1: return 1 potential = ( ((n_chunks // n_workers + (n_chunks % n_workers > 1)) * chunksize) + (n_chunks % n_workers == 1) * last_chunk ) * n_workers n_full_chunks = n_chunks - (chunksize > last_chunk) taskels_in_regular_chunks = n_full_chunks * chunksize real = taskels_in_regular_chunks + (chunksize > last_chunk) * last_chunk ade = real / potential return ade
Si no hay cuota inactiva , la cuota ocupada será igual a la programación paralela , por lo que obtenemos un ADE del 100%. En nuestro modelo simplificado, este es un escenario en el que todos los procesos disponibles estarán ocupados durante todo el tiempo necesario para procesar todas las tareas. En otras palabras, todo el trabajo se paraleliza efectivamente al 100 por ciento.
Pero, ¿por qué sigo refiriéndome a la educación física como educación física absoluta aquí?
Para comprender eso, tenemos que considerar un caso posible para el tamaño de chunksize (cs) que garantiza la máxima flexibilidad de programación (también, el número de montañeses que puede haber. ¿Coincidencia?):
Si, por ejemplo, tenemos cuatro procesos de trabajo y 37 tareas, habrá trabajadores inactivos incluso con
chunksize=1
, solo porquen_workers=4
no es un divisor de 37. El resto de la división 37/4 es 1. Esta única tarea restante tendrá que ser procesado por un solo trabajador, mientras que los tres restantes están inactivos.Del mismo modo, todavía habrá un trabajador inactivo con 39 tareas, como puede ver en la imagen de abajo.
Cuando compare el Programa paralelo superior para
chunksize=1
con la versión siguiente parachunksize=3
, notará que el Programa paralelo superior es más pequeño, la línea de tiempo en el eje x más corta. Debería ser obvio ahora, cómo los fragmentos más grandes de forma inesperada también pueden conducir a un aumento de los tiempos de cálculo generales, incluso para los escenarios densos .Porque los gastos generales no están contenidos en este modelo. Será diferente para ambos tamaños de trozos, por lo que el eje x no es realmente comparable directamente. La sobrecarga aún puede conducir a un tiempo total de cálculo más largo, como se muestra en el caso 2 de la figura siguiente.
6.3.2 Eficiencia de distribución relativa (RDE)
El valor de ADE no contiene la información si es posible una mejor distribución de las tareas con chunksize establecido en 1. Mejor aquí todavía significa una participación inactiva más pequeña .
Para obtener un valor DE ajustado por la máxima DE posible , tenemos que dividir el ADE considerado entre el ADE que obtenemos
chunksize=1
.Así es como se ve esto en el código:
# mp_utils.py def calc_rde(n_workers, len_iterable, n_chunks, chunksize, last_chunk): """Calculate Relative Distribution Efficiency (RDE).""" ade_cs1 = calc_ade( n_workers, len_iterable, n_chunks=len_iterable, chunksize=1, last_chunk=1 ) ade = calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk) rde = ade / ade_cs1 return rde
RDE , como se define aquí, en esencia es un cuento sobre la cola de un Programa Paralelo . RDE está influenciado por el tamaño de trozo máximo efectivo contenido en la cola. (Esta cola puede tener la longitud del eje x
chunksize
olast_chunk
). Esto tiene la consecuencia de que el RDE converge naturalmente al 100% (uniforme) para todo tipo de "apariencia de cola" como se muestra en la figura siguiente.Un RDE bajo ...
Encuentre la Parte II de esta respuesta aquí .
fuente
7. Algoritmo de tamaño ingenuo vs.
Antes de entrar en detalles, considere los dos gifs a continuación. Para un rango de diferentes
iterable
longitudes, muestran cómo los dos algoritmos comparados fragmentan el pasadoiterable
(será una secuencia para entonces) y cómo se podrían distribuir las tareas resultantes. El orden de los trabajadores es aleatorio y el número de tareas distribuidas por trabajador en realidad puede diferir de estas imágenes para tareas ligeras o tareas en un escenario amplio. Como se mencionó anteriormente, los gastos generales tampoco se incluyen aquí. Sin embargo, para tareas lo suficientemente pesadas en un escenario denso con tamaños de datos transmitidos despreciables, los cálculos reales dibujan una imagen muy similar.Como se muestra en el capítulo " 5. Chunksize-Algorithm de Pool", con el algoritmo de Chunksize de Pool, el número de fragmentos se estabilizará
n_chunks == n_workers * 4
para iterables suficientemente grandes, mientras sigue cambiando entren_chunks == n_workers
yn_chunks == n_workers + 1
con el enfoque ingenuo. Para el algoritmo ingenuo se aplica: Porquen_chunks % n_workers == 1
esTrue
paran_chunks == n_workers + 1
, se creará una nueva sección donde solo se empleará un solo trabajador.A continuación, verá una figura similar a la que se muestra en el capítulo 5, pero que muestra el número de secciones en lugar del número de fragmentos. Para el algoritmo de tamaño completo de Pool (
n_pool2
),n_sections
se estabilizará en el infame factor codificado4
. Para el algoritmo ingenuo,n_sections
se alternará entre uno y dos.Para el algoritmo de chunksize de Pool, la estabilización a
n_chunks = n_workers * 4
través del tratamiento adicional mencionado anteriormente , evita la creación de una nueva sección aquí y mantiene la cuota inactiva limitada a un trabajador durante iterables lo suficientemente largos. No solo eso, sino que el algoritmo seguirá reduciendo el tamaño relativo de la cuota inactiva , lo que conduce a un valor de RDE que converge hacia el 100%."Lo suficientemente largo" para
n_workers=4
es,len_iterable=210
por ejemplo. Para iterables iguales o más grandes que eso, la participación inactiva se limitará a un trabajador, un rasgo que originalmente se perdió debido a la4
-multiplicación dentro del algoritmo chunksize-en primer lugar.El ingenuo algoritmo chunksize también converge hacia el 100%, pero lo hace más lento. El efecto de convergencia depende únicamente del hecho de que la parte relativa de la cola se contrae en los casos en que habrá dos secciones. Esta cola con un solo trabajador empleado se limita a la longitud del eje x
n_workers - 1
, el resto máximo posible paralen_iterable / n_workers
.A continuación, encontrará dos mapas de calor que muestran los valores de RDE para todas las longitudes iterables hasta 5000, para todos los números de trabajadores desde 2 hasta 100. La escala de colores va de 0,5 a 1 (50% -100%). Notará áreas mucho más oscuras (valores RDE más bajos) para el algoritmo ingenuo en el mapa de calor de la izquierda. Por el contrario, el algoritmo de tamaño de trozo de Pool a la derecha dibuja una imagen mucho más brillante.
El gradiente diagonal de las esquinas oscuras de la parte inferior izquierda frente a las esquinas brillantes de la parte superior derecha muestra nuevamente la dependencia del número de trabajadores para lo que se puede llamar un "iterable largo".
Con el algoritmo de tamaño de trozo de Pool, un valor de RDE de 81,25% es el valor más bajo para el rango de trabajadores y longitudes iterables especificadas anteriormente:
Con el ingenuo algoritmo chunksize, las cosas pueden empeorar mucho. El RDE calculado más bajo aquí es 50,72%. En este caso, ¡casi la mitad del tiempo de cálculo solo se ejecuta un trabajador! Así que, cuidado, orgullosos propietarios de Knights Landing . ;)
8. Verificación de la realidad
En los capítulos anteriores consideramos un modelo simplificado para el problema de distribución puramente matemático, despojado de los detalles esenciales que hacen del multiprocesamiento un tema tan espinoso en primer lugar. Para comprender mejor hasta qué punto el modelo de distribución (DM) por sí solo puede contribuir a explicar la utilización observada de los trabajadores en la realidad, ahora veremos algunas programaciones paralelas dibujadas mediante cálculos reales .
Preparar
Las siguientes gráficas tratan todas con ejecuciones paralelas de una función ficticia simple, unida a la CPU, que se llama con varios argumentos para que podamos observar cómo la Programación paralela dibujada varía en dependencia de los valores de entrada. El "trabajo" dentro de esta función consiste solo en la iteración sobre un objeto de rango. Esto ya es suficiente para mantener un núcleo ocupado, ya que pasamos grandes números. Opcionalmente, la función toma un extra exclusivo de la tarea
data
que simplemente se devuelve sin cambios. Dado que cada taskel comprende exactamente la misma cantidad de trabajo, todavía estamos tratando con un escenario denso aquí.La función está decorada con un contenedor que toma marcas de tiempo con resolución ns (Python 3.7+). Las marcas de tiempo se utilizan para calcular el intervalo de tiempo de un taskel y, por lo tanto, permiten dibujar un horario paralelo empírico.
@stamp_taskel def busy_foo(i, it, data=None): """Dummy function for CPU-bound work.""" for _ in range(int(it)): pass return i, data def stamp_taskel(func): """Decorator for taking timestamps on start and end of decorated function execution. """ @wraps(func) def wrapper(*args, **kwargs): start_time = time_ns() result = func(*args, **kwargs) end_time = time_ns() return (current_process().name, (start_time, end_time)), result return wrapper
El método de mapa de estrellas de Pool también está decorado de tal manera que solo se cronometra la llamada de mapa de estrellas. El "inicio" y el "final" de esta llamada determinan el mínimo y el máximo en el eje x del programa paralelo producido.
Vamos a observar el cálculo de 40 tareas en cuatro procesos de trabajo en una máquina con estas especificaciones: Python 3.7.1, Ubuntu 18.04.2, CPU Intel® Core ™ i7-2600K @ 3.40GHz × 8
Los valores de entrada que se variarán son el número de iteraciones en el bucle for (30k, 30M, 600M) y el tamaño de los datos de envío adicional (por taskel, numpy-ndarray: 0 MiB, 50 MiB).
... N_WORKERS = 4 LEN_ITERABLE = 40 ITERATIONS = 30e3 # 30e6, 600e6 DATA_MiB = 0 # 50 iterable = [ # extra created data per taskel (i, ITERATIONS, np.arange(int(DATA_MiB * 2**20 / 8))) # taskel args for i in range(LEN_ITERABLE) ] with Pool(N_WORKERS) as pool: results = pool.starmap(busy_foo, iterable)
Las ejecuciones que se muestran a continuación se seleccionaron a dedo para tener el mismo orden de fragmentos, de modo que pueda detectar mejor las diferencias en comparación con la programación paralela del modelo de distribución, pero no olvide que el orden en el que los trabajadores realizan su tarea no es determinista.
Predicción DM
Para reiterar, el modelo de distribución "predice" una programación paralela como ya lo hemos visto antes en el capítulo 6.2:
1a EJECUCIÓN: 30k iteraciones y 0 datos MiB por tarea
Nuestra primera ejecución aquí es muy corta, las tareas son muy "ligeras". La
pool.starmap()
llamada completa solo tomó 14,5 ms en total. Notarás que, al contrario que con el DM , el ralentí no se limita a la sección de cola, sino que también tiene lugar entre tareas e incluso entre tareas. Eso es porque nuestro horario real aquí incluye naturalmente todo tipo de gastos generales. Ralentí aquí significa todo lo que está fuera de un taskel. Posible verdadera ralentí durante un taskel no se captura la forma ya mencionada antes.Además, puede ver que no todos los trabajadores realizan sus tareas al mismo tiempo. Eso se debe al hecho de que todos los trabajadores se alimentan de un elemento compartido
inqueue
y solo un trabajador puede leerlo a la vez. Lo mismo se aplica aoutqueue
. Esto puede causar mayores problemas tan pronto como esté transmitiendo tamaños de datos no marginales, como veremos más adelante.Además, puede ver que, a pesar de que cada taskel comprende la misma cantidad de trabajo, el intervalo de tiempo real medido para un taskel varía mucho. Las tareas distribuidas a worker-3 y worker-4 necesitan más tiempo que las procesadas por los dos primeros trabajadores. Para esta ejecución, sospecho que se debe a que el turbo boost ya no está disponible en los núcleos para worker-3/4 en ese momento, por lo que procesaron sus tareas con una frecuencia de reloj más baja.
Todo el cálculo es tan ligero que los factores de caos introducidos por el hardware o el sistema operativo pueden sesgar el PS drásticamente. El cálculo es una "hoja en el viento" y la predicción DM tiene poca importancia, incluso para un escenario teóricamente adecuado.
2da EJECUCIÓN: 30 millones de iteraciones y 0 datos MiB por tarea
Aumentar el número de iteraciones en el bucle for de 30.000 a 30 millones da como resultado un Programa Paralelo real que se acerca a una coincidencia perfecta con el predicho por los datos proporcionados por el DM , ¡hurra! El cálculo por tarea ahora es lo suficientemente pesado como para marginar las partes inactivas al principio y en el medio, dejando solo visible la gran parte inactiva que predijo el DM .
3a EJECUCIÓN: 30 millones de iteraciones y 50 MiB de datos por tarea
Mantener las iteraciones de 30M, pero además enviar 50 MiB por taskel hacia adelante y hacia atrás sesga la imagen nuevamente. Aquí el efecto de cola es bien visible. Worker-4 necesita esperar más tiempo para su segunda tarea que Worker-1. ¡Ahora imagina este horario con 70 trabajadores!
En caso de que las tareas sean muy ligeras desde el punto de vista computacional, pero ofrecen una cantidad notable de datos como carga útil, el cuello de botella de una sola cola compartida puede evitar cualquier beneficio adicional de agregar más trabajadores al grupo, incluso si están respaldados por núcleos físicos. En tal caso, Worker-1 podría terminar con su primera tarea y esperar una nueva incluso antes de que Worker-40 haya obtenido su primera tarea.
Ahora debería resultar obvio por qué los tiempos de cálculo en a
Pool
no siempre disminuyen linealmente con el número de trabajadores. El envío de cantidades relativamente grandes de datos puede conducir a escenarios en los que la mayor parte del tiempo se dedica a esperar a que los datos se copien en el espacio de direcciones de un trabajador y solo se puede alimentar a un trabajador a la vez.4ta EJECUCIÓN: 600 millones de iteraciones y 50 MiB de datos por tarea
Aquí enviamos 50 MiB de nuevo, pero aumentamos el número de iteraciones de 30 M a 600 M, lo que aumenta el tiempo total de cálculo de 10 sa 152 s. El horario paralelo dibujado de nuevo , está cerca de una coincidencia perfecta con el predicho, la sobrecarga a través de la copia de datos está marginada.
9. Conclusión
La multiplicación discutida por
4
aumenta la flexibilidad de programación, pero también aprovecha la desigualdad en las distribuciones de tareas. Sin esta multiplicación, la cuota inactiva se limitaría a un solo trabajador incluso para iterables cortos (para DM con escenario denso). El algoritmo de chunksize de Pool necesita iterables de entrada para ser de cierto tamaño para recuperar ese rasgo.Como se espera que esta respuesta haya demostrado, el algoritmo de tamaño de trozo de Pool conduce a una mejor utilización del núcleo en promedio en comparación con el enfoque ingenuo, al menos para el caso promedio y siempre que no se considere la sobrecarga. El algoritmo ingenuo aquí puede tener una eficiencia de distribución (DE) tan baja como ~ 51%, mientras que el algoritmo de tamaño de trozo de Pool tiene un mínimo de ~ 81%. Sin embargo, DE no incluye gastos generales de paralelización (PO) como IPC. El capítulo 8 ha demostrado que la DE todavía puede tener un gran poder de predicción para el escenario denso con gastos generales marginalizados.
A pesar del hecho de que el algoritmo de tamaño de trozo de Pool logra una DE más alta en comparación con el enfoque ingenuo, no proporciona distribuciones de tareas óptimas para cada constelación de entrada. Si bien un algoritmo de fragmentación estático simple no puede optimizar (incluida la sobrecarga) la Eficiencia de paralelización (PE), no hay una razón inherente por la que no siempre pueda proporcionar una Eficiencia de distribución relativa (RDE) del 100%, es decir, la misma DE que con
chunksize=1
. Un simple algoritmo de tamaño de trozos consiste solo en matemáticas básicas y es libre de "cortar el pastel" de cualquier manera.A diferencia de la implementación de Pool de un algoritmo de "fragmentación de igual tamaño", un algoritmo de "fragmentación de tamaño uniforme" proporcionaría un RDE del 100% para todos los
len_iterable
/n_workers
combinación. Un algoritmo de fragmentación de tamaño uniforme sería un poco más complicado de implementar en la fuente de Pool, pero se puede modular sobre el algoritmo existente simplemente empaquetando las tareas externamente (vincularé desde aquí en caso de que deje caer una Q / A en como hacer eso).fuente
Creo que parte de lo que se está perdiendo es que su estimación ingenua asume que cada unidad de trabajo requiere la misma cantidad de tiempo, en cuyo caso su estrategia sería la mejor. Pero si algunos trabajos terminan antes que otros, algunos núcleos pueden quedar inactivos esperando a que terminen los trabajos lentos.
Por lo tanto, al dividir los fragmentos en 4 veces más, si un fragmento termina antes, ese núcleo puede iniciar el siguiente fragmento (mientras que los otros núcleos siguen trabajando en su fragmento más lento).
No sé por qué eligieron el factor 4 exactamente, pero sería una compensación entre minimizar la sobrecarga del código del mapa (que quiere los fragmentos más grandes posibles) y equilibrar los fragmentos tomando diferentes cantidades de veces (lo que quiere el fragmento más pequeño posible) ).
fuente