Aplicando la función Python a Pandas DataFrame agrupado: ¿cuál es el enfoque más eficiente para acelerar los cálculos?

9

Estoy tratando con un marco de datos Pandas bastante grande: mi conjunto de datos se asemeja a una dfconfiguración siguiente :

import pandas as pd
import numpy  as np

#--------------------------------------------- SIZING PARAMETERS :
R1 =                    20        # .repeat( repeats = R1 )
R2 =                    10        # .repeat( repeats = R2 )
R3 =                541680        # .repeat( repeats = [ R3, R4 ] )
R4 =                576720        # .repeat( repeats = [ R3, R4 ] )
T  =                 55920        # .tile( , T)
A1 = np.arange( 0, 2708400, 100 ) # ~ 20x re-used
A2 = np.arange( 0, 2883600, 100 ) # ~ 20x re-used

#--------------------------------------------- DataFrame GENERATION :
df = pd.DataFrame.from_dict(
         { 'measurement_id':        np.repeat( [0, 1], repeats = [ R3, R4 ] ), 
           'time':np.concatenate( [ np.repeat( A1,     repeats = R1 ),
                                    np.repeat( A2,     repeats = R1 ) ] ), 
           'group':        np.tile( np.repeat( [0, 1], repeats = R2 ), T ),
           'object':       np.tile( np.arange( 0, R1 ),                T )
           }
        )

#--------------------------------------------- DataFrame RE-PROCESSING :
df = pd.concat( [ df,
                  df                                                  \
                    .groupby( ['measurement_id', 'time', 'group'] )    \
                    .apply( lambda x: np.random.uniform( 0, 100, 10 ) ) \
                    .explode()                                           \
                    .astype( 'float' )                                    \
                    .to_frame( 'var' )                                     \
                    .reset_index( drop = True )
                  ], axis = 1
                )

Nota: con el fin de tener un ejemplo mínimo, puede subconjustarse fácilmente (por ejemplo, con df.loc[df['time'] <= 400, :]), pero dado que simulo los datos de todos modos, pensé que el tamaño original daría una mejor visión general.

Para cada grupo definido por ['measurement_id', 'time', 'group']necesito llamar a la siguiente función:

from sklearn.cluster import SpectralClustering
from pandarallel     import pandarallel

def cluster( x, index ):
    if len( x ) >= 2:
        data = np.asarray( x )[:, np.newaxis]
        clustering = SpectralClustering( n_clusters   =  5,
                                         random_state = 42
                                         ).fit( data )
        return pd.Series( clustering.labels_ + 1, index = index )
    else:
        return pd.Series( np.nan, index = index )

Para mejorar el rendimiento probé dos enfoques:

Paquete Pandarallel

El primer enfoque fue paralelizar los cálculos usando el pandarallelpaquete:

pandarallel.initialize( progress_bar = True )
df \
  .groupby( ['measurement_id', 'time', 'group'] ) \
  .parallel_apply( lambda x: cluster( x['var'], x['object'] ) )

Sin embargo, esto parece ser subóptimo ya que consume mucha RAM y no todos los núcleos se usan en los cálculos (incluso a pesar de especificar explícitamente el número de núcleos en el pandarallel.initialize()método). Además, a veces los cálculos se terminan con varios errores, aunque no he tenido la oportunidad de encontrar una razón para eso (¿posiblemente falta de RAM?).

PySpark Pandas UDF

También probé un Spark Pandas UDF, aunque soy totalmente nuevo en Spark. Aquí está mi intento:

import findspark;  findspark.init()

from pyspark.sql           import SparkSession
from pyspark.conf          import SparkConf
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types     import *

spark = SparkSession.builder.master( "local" ).appName( "test" ).config( conf = SparkConf() ).getOrCreate()
df = spark.createDataFrame( df )

@pandas_udf( StructType( [StructField( 'id', IntegerType(), True )] ), functionType = PandasUDFType.GROUPED_MAP )
def cluster( df ):
    if len( df['var'] ) >= 2:
        data = np.asarray( df['var'] )[:, np.newaxis]
        clustering = SpectralClustering( n_clusters   =  5,
                                         random_state = 42
                                         ).fit( data )
        return pd.DataFrame( clustering.labels_ + 1,
                             index = df['object']
                             )
    else:
        return pd.DataFrame( np.nan,
                             index = df['object']
                             )

res = df                                           \
        .groupBy( ['id_half', 'frame', 'team_id'] ) \
        .apply( cluster )                            \
        .toPandas()

Desafortunadamente, el rendimiento también fue insatisfactorio, y por lo que leí sobre el tema, esto puede ser solo la carga del uso de la función UDF, escrita en Python y la necesidad asociada de convertir todos los objetos de Python en objetos Spark y viceversa.

Asi que aqui están mis preguntas:

  1. ¿Podría cualquiera de mis enfoques ajustarse para eliminar posibles cuellos de botella y mejorar el rendimiento? (p. ej., configuración de PySpark, ajuste de operaciones subóptimas, etc.)
  2. ¿Son mejores alternativas? ¿Cómo se comparan con las soluciones proporcionadas en términos de rendimiento?
Kuba_
fuente
2
investigaste dask ?
Danila Ganchar
1
Todavía no, pero gracias por su sugerencia - Lo
intentaré
desafortunadamente no dask
trabajé
Por rendimiento me refería al tiempo en que se pueden terminar los cálculos.
Kuba_

Respuestas:

1

P : " ¿Podría ajustarse cualquiera de mis enfoques para eliminar posibles cuellos de botella y mejorar el rendimiento? (Por ejemplo, configuración de PySpark, ajuste de operaciones subóptimas, etc.) "

+1por mencionar los costos generales del complemento de configuración para cualquier estrategia de computación. Esto siempre hace un punto de equilibrio, solo después de lo cual una falta de estrategia puede lograr una alegría beneficiosa de una aceleración de dominio deseada (sin embargo, si otros, los costos típicamente de dominio permiten o siguen siendo factibles, sí, RAM). .. existencia y acceso a un dispositivo de tal tamaño, presupuesto y otras restricciones similares del mundo real)[SERIAL][TIME][SPACE]

Primero,
la verificación previa al vuelo, antes de despegar.

La nueva formulación estricta de la Ley de Amdahl es capaz de incorporar estos dos pSO + pTOgastos generales adicionales y los refleja al predecir los niveles de Aceleración alcanzables, incluido el punto de equilibrio. punto, ya que puede ser significativo (en un sentido de costo / efecto, eficiencia) ir en paralelo.

ingrese la descripción de la imagen aquí

Sin embargo,
ese no es nuestro problema central aquí .
Esto viene después:

A continuación,
dados los costos computacionales de SpectralClustering(), que utilizará el núcleo de la función Radial Boltzmann, ~ exp( -gamma * distance( data, data )**2 )parece que no hay avance de la división del objeto datasobre cualquier número de unidades de trabajo disjuntas, ya que el distance( data, data )componente, por definición, no tiene más que visite todos los dataelementos (ref. los costos de comunicación de las { process | node }topologías distribuidas de paso de valor de cualquiera a cualquier son, por razones obvias, terriblemente malas, si no los peores casos de uso para el { process | node }procesamiento distribuido, si no los antipatrones directos (excepto algunas telas realmente arcanas, sin memoria / sin estado, pero computacionales).

Para los analistas pedantes, sí, agreguen a esto (y ya podemos decir que es un mal estado) los costos de, nuevamente , el procesamiento de k-significa cualquiera a cualquiera , aquí O( N^( 1 + 5 * 5 ) )va eso, para N ~ len( data ) ~ 1.12E6+, terriblemente en contra de nuestro deseo de tener algo Procesamiento inteligente y rápido.

¿Y qué?

Mientras que los costos de instalación no se descuidan, los aumento de los costos de comunicación es casi seguro desactivar cualquier mejora del uso de los intentos anteriormente esbozada para pasar de un puro- [SERIAL]flujo del proceso en alguna forma de simplemente - [CONCURRENT]o cierto- [PARALLEL]orquestación de algunas de trabajo subunidades , debido al aumento de los gastos generales relacionados con la necesidad de implementar (un par de tándem) topologías de paso de valor de cualquiera a cualquier.

Si no fuera por ellos?

Bueno, esto suena como un oxímoron de la ciencia de la computación, incluso si fuera posible, los costos de las distancias [TIME]precalculadas de uno a cualquier (lo que tomaría esos costos de complejidad de dominio inmenso "de antemano" (¿Dónde? ¿Cómo? ¿Hay alguna otro, latencia no evitable, que permite un posible enmascaramiento de latencia por parte de una acumulación incremental (desconocida hasta ahora) de una matriz de distancia completa en el futuro ¿cualquiera?)) pero solo reposicionaría estos costos principalmente presentes en alguna otra ubicación en [TIME]- y [SPACE]-Domains, no reducirlos.

P : "¿Son mejores alternativas? "

Hasta ahora, sé que el único es intentar, si es posible volver a formular el problema en otro, una forma problemática formulada con QUBO (ref .: Q uantum- U nocontrained- B inary- O ptimisation , una buena noticia es que las herramientas para hacerlo, una base de conocimiento de primera mano y experiencia práctica de resolución de problemas existen y crecen)

P : ¿Cómo se comparan con las soluciones proporcionadas en términos de rendimiento?

El rendimiento es impresionante: el problema formulado por QUBO tiene un O(1)solucionador prometedor (!) En tiempo constante (en el [TIME]dominio) y algo restringido en el [SPACE]dominio (donde los trucos LLNL recientemente anunciados pueden ayudar a evitar este mundo físico, la implementación actual de QPU, la restricción del problema tamaños).

usuario3666197
fuente
Esta es una respuesta interesante, pero parece perder el punto: OP entrena múltiples modelos pequeños, no uno solo. Por lo tanto, su observación central es sobre todo irrelevante.
user10938362
@ user10938362 ¿Cómo se traduce su propiedad reclamada (capacitación de modelos pequeños ) a alguna otra métrica de costos de procesamiento de big-O publicada anteriormente? Claro, muchos modelos más pequeños prometen una suma teóricamente creciente de costos (aún) grandes de O de procesamiento individual (ahora más pequeño en N, pero no en otros factores) , sin embargo, debe agregar a esto una suma terriblemente más cara de todo costos adicionales de los costos generales de instalación y terminación más todos los costos generales adicionales de comunicación (parámetros / datos / resultados + típicamente también pares de costos de procesamiento SER / DES en cada paso)
usuario3666197
0

Esta no es una respuesta, pero ...

Si tu corres

df.groupby(['measurement_id', 'time', 'group']).apply(
    lambda x: cluster(x['var'], x['object']))

(es decir, solo con Pandas), notará que ya está utilizando varios núcleos. Esto se debe a que sklearnutiliza joblibde forma predeterminada para paralelizar el trabajo. Usted puede intercambiar el planificador a favor de Dask y tal vez obtener una mayor eficiencia durante el intercambio de los datos entre los hilos, pero siempre y cuando el trabajo que están haciendo está vinculado a la CPU como este, no habrá nada que pueda hacer para acelerarlo.

En resumen, este es un problema de algoritmo: descubra lo que realmente necesita calcular, antes de tratar de considerar diferentes marcos para calcularlo.

duradero
fuente
¿Podría explicar por qué menciona "... compartir los datos entre subprocesos ..." una vez que la división del trabajo fue organizada por procesosjoblib generados , que no tienen nada que ver con los subprocesos, y menos con el intercambio? Gracias por su amable aclaración de los argumentos.
user3666197
Exactamente, jboblib normalmente usa procesos, pero también puede usar dask como back-end, donde puede elegir su mezcla de hilos y procesos.
mdurant
Soy un novato en la computación paralela, pero incluso si sklearn usa paralelización, ¿no es inútil en esta configuración? Quiero decir, las operaciones realizadas por sklearn son extremadamente simples ya que cada operación de agrupación se aplica a solo 10 puntos. De nuevo, podría estar equivocado aquí, pero creo que la forma en que paralelizamos el procesamiento de fragmentos de datos originales es el verdadero problema.
Kuba_
"no es inútil en esta configuración" - bueno, usas el rendimiento de 8 núcleos de CPU en lugar de 1.
duradero
0

No soy un experto Dask, pero proporciono el siguiente código como referencia:

import dask.dataframe as ddf

df = ddf.from_pandas(df, npartitions=4) # My PC has 4 cores

task = df.groupby(["measurement_id", "time", "group"]).apply(
    lambda x: cluster(x["var"], x["object"]),
    meta=pd.Series(np.nan, index=pd.Series([0, 1, 1, 1])),
)

res = task.compute()
chiflado
fuente