Estoy tratando con un marco de datos Pandas bastante grande: mi conjunto de datos se asemeja a una df
configuración siguiente :
import pandas as pd
import numpy as np
#--------------------------------------------- SIZING PARAMETERS :
R1 = 20 # .repeat( repeats = R1 )
R2 = 10 # .repeat( repeats = R2 )
R3 = 541680 # .repeat( repeats = [ R3, R4 ] )
R4 = 576720 # .repeat( repeats = [ R3, R4 ] )
T = 55920 # .tile( , T)
A1 = np.arange( 0, 2708400, 100 ) # ~ 20x re-used
A2 = np.arange( 0, 2883600, 100 ) # ~ 20x re-used
#--------------------------------------------- DataFrame GENERATION :
df = pd.DataFrame.from_dict(
{ 'measurement_id': np.repeat( [0, 1], repeats = [ R3, R4 ] ),
'time':np.concatenate( [ np.repeat( A1, repeats = R1 ),
np.repeat( A2, repeats = R1 ) ] ),
'group': np.tile( np.repeat( [0, 1], repeats = R2 ), T ),
'object': np.tile( np.arange( 0, R1 ), T )
}
)
#--------------------------------------------- DataFrame RE-PROCESSING :
df = pd.concat( [ df,
df \
.groupby( ['measurement_id', 'time', 'group'] ) \
.apply( lambda x: np.random.uniform( 0, 100, 10 ) ) \
.explode() \
.astype( 'float' ) \
.to_frame( 'var' ) \
.reset_index( drop = True )
], axis = 1
)
Nota: con el fin de tener un ejemplo mínimo, puede subconjustarse fácilmente (por ejemplo, con df.loc[df['time'] <= 400, :]
), pero dado que simulo los datos de todos modos, pensé que el tamaño original daría una mejor visión general.
Para cada grupo definido por ['measurement_id', 'time', 'group']
necesito llamar a la siguiente función:
from sklearn.cluster import SpectralClustering
from pandarallel import pandarallel
def cluster( x, index ):
if len( x ) >= 2:
data = np.asarray( x )[:, np.newaxis]
clustering = SpectralClustering( n_clusters = 5,
random_state = 42
).fit( data )
return pd.Series( clustering.labels_ + 1, index = index )
else:
return pd.Series( np.nan, index = index )
Para mejorar el rendimiento probé dos enfoques:
Paquete Pandarallel
El primer enfoque fue paralelizar los cálculos usando el pandarallel
paquete:
pandarallel.initialize( progress_bar = True )
df \
.groupby( ['measurement_id', 'time', 'group'] ) \
.parallel_apply( lambda x: cluster( x['var'], x['object'] ) )
Sin embargo, esto parece ser subóptimo ya que consume mucha RAM y no todos los núcleos se usan en los cálculos (incluso a pesar de especificar explícitamente el número de núcleos en el pandarallel.initialize()
método). Además, a veces los cálculos se terminan con varios errores, aunque no he tenido la oportunidad de encontrar una razón para eso (¿posiblemente falta de RAM?).
PySpark Pandas UDF
También probé un Spark Pandas UDF, aunque soy totalmente nuevo en Spark. Aquí está mi intento:
import findspark; findspark.init()
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
spark = SparkSession.builder.master( "local" ).appName( "test" ).config( conf = SparkConf() ).getOrCreate()
df = spark.createDataFrame( df )
@pandas_udf( StructType( [StructField( 'id', IntegerType(), True )] ), functionType = PandasUDFType.GROUPED_MAP )
def cluster( df ):
if len( df['var'] ) >= 2:
data = np.asarray( df['var'] )[:, np.newaxis]
clustering = SpectralClustering( n_clusters = 5,
random_state = 42
).fit( data )
return pd.DataFrame( clustering.labels_ + 1,
index = df['object']
)
else:
return pd.DataFrame( np.nan,
index = df['object']
)
res = df \
.groupBy( ['id_half', 'frame', 'team_id'] ) \
.apply( cluster ) \
.toPandas()
Desafortunadamente, el rendimiento también fue insatisfactorio, y por lo que leí sobre el tema, esto puede ser solo la carga del uso de la función UDF, escrita en Python y la necesidad asociada de convertir todos los objetos de Python en objetos Spark y viceversa.
Asi que aqui están mis preguntas:
- ¿Podría cualquiera de mis enfoques ajustarse para eliminar posibles cuellos de botella y mejorar el rendimiento? (p. ej., configuración de PySpark, ajuste de operaciones subóptimas, etc.)
- ¿Son mejores alternativas? ¿Cómo se comparan con las soluciones proporcionadas en términos de rendimiento?
dask
Respuestas:
+1
por mencionar los costos generales del complemento de configuración para cualquier estrategia de computación. Esto siempre hace un punto de equilibrio, solo después de lo cual una falta de estrategia puede lograr una alegría beneficiosa de una aceleración de dominio deseada (sin embargo, si otros, los costos típicamente de dominio permiten o siguen siendo factibles, sí, RAM). .. existencia y acceso a un dispositivo de tal tamaño, presupuesto y otras restricciones similares del mundo real)[SERIAL]
[TIME]
[SPACE]
Primero,
la verificación previa al vuelo, antes de despegar.
La nueva formulación estricta de la Ley de Amdahl es capaz de incorporar estos dos
pSO + pTO
gastos generales adicionales y los refleja al predecir los niveles de Aceleración alcanzables, incluido el punto de equilibrio. punto, ya que puede ser significativo (en un sentido de costo / efecto, eficiencia) ir en paralelo.Sin embargo,
ese no es nuestro problema central aquí .
Esto viene después:
A continuación,
dados los costos computacionales de
SpectralClustering()
, que utilizará el núcleo de la función Radial Boltzmann,~ exp( -gamma * distance( data, data )**2 )
parece que no hay avance de la división del objetodata
sobre cualquier número de unidades de trabajo disjuntas, ya que eldistance( data, data )
componente, por definición, no tiene más que visite todos losdata
elementos (ref. los costos de comunicación de las{ process | node }
topologías distribuidas de paso de valor de cualquiera a cualquier son, por razones obvias, terriblemente malas, si no los peores casos de uso para el{ process | node }
procesamiento distribuido, si no los antipatrones directos (excepto algunas telas realmente arcanas, sin memoria / sin estado, pero computacionales).Para los analistas pedantes, sí, agreguen a esto (y ya podemos decir que es un mal estado) los costos de, nuevamente , el procesamiento de k-significa cualquiera a cualquiera , aquí
O( N^( 1 + 5 * 5 ) )
va eso, paraN ~ len( data ) ~ 1.12E6+
, terriblemente en contra de nuestro deseo de tener algo Procesamiento inteligente y rápido.¿Y qué?
Mientras que los costos de instalación no se descuidan, los aumento de los costos de comunicación es casi seguro desactivar cualquier mejora del uso de los intentos anteriormente esbozada para pasar de un puro-
[SERIAL]
flujo del proceso en alguna forma de simplemente -[CONCURRENT]
o cierto-[PARALLEL]
orquestación de algunas de trabajo subunidades , debido al aumento de los gastos generales relacionados con la necesidad de implementar (un par de tándem) topologías de paso de valor de cualquiera a cualquier.Si no fuera por ellos?
Bueno, esto suena como un oxímoron de la ciencia de la computación, incluso si fuera posible, los costos de las distancias
[TIME]
precalculadas de uno a cualquier (lo que tomaría esos costos de complejidad de dominio inmenso "de antemano" (¿Dónde? ¿Cómo? ¿Hay alguna otro, latencia no evitable, que permite un posible enmascaramiento de latencia por parte de una acumulación incremental (desconocida hasta ahora) de una matriz de distancia completa en el futuro ¿cualquiera?)) pero solo reposicionaría estos costos principalmente presentes en alguna otra ubicación en[TIME]
- y[SPACE]
-Domains, no reducirlos.Hasta ahora, sé que el único es intentar, si es posible volver a formular el problema en otro, una forma problemática formulada con QUBO (ref .: Q uantum- U nocontrained- B inary- O ptimisation , una buena noticia es que las herramientas para hacerlo, una base de conocimiento de primera mano y experiencia práctica de resolución de problemas existen y crecen)
El rendimiento es impresionante: el problema formulado por QUBO tiene un
O(1)
solucionador prometedor (!) En tiempo constante (en el[TIME]
dominio) y algo restringido en el[SPACE]
dominio (donde los trucos LLNL recientemente anunciados pueden ayudar a evitar este mundo físico, la implementación actual de QPU, la restricción del problema tamaños).fuente
Esta no es una respuesta, pero ...
Si tu corres
(es decir, solo con Pandas), notará que ya está utilizando varios núcleos. Esto se debe a que
sklearn
utilizajoblib
de forma predeterminada para paralelizar el trabajo. Usted puede intercambiar el planificador a favor de Dask y tal vez obtener una mayor eficiencia durante el intercambio de los datos entre los hilos, pero siempre y cuando el trabajo que están haciendo está vinculado a la CPU como este, no habrá nada que pueda hacer para acelerarlo.En resumen, este es un problema de algoritmo: descubra lo que realmente necesita calcular, antes de tratar de considerar diferentes marcos para calcularlo.
fuente
joblib
generados , que no tienen nada que ver con los subprocesos, y menos con el intercambio? Gracias por su amable aclaración de los argumentos.No soy un experto
Dask
, pero proporciono el siguiente código como referencia:fuente