¿Hacer que Pandas DataFrame aplique () use todos los núcleos?

105

A partir de agosto de 2017, Pandas DataFame.apply () lamentablemente todavía está limitado a trabajar con un solo núcleo, lo que significa que una máquina de múltiples núcleos desperdiciará la mayor parte de su tiempo de cómputo cuando ejecute df.apply(myfunc, axis=1).

¿Cómo puede usar todos sus núcleos para ejecutar la aplicación en un marco de datos en paralelo?

Roko Mijic
fuente

Respuestas:

80

Puede utilizar el swifterpaquete:

pip install swifter

Funciona como un complemento para pandas, lo que le permite reutilizar la applyfunción:

import swifter

def some_function(data):
    return data * 10

data['out'] = data['in'].swifter.apply(some_function)

Descubrirá automáticamente la forma más eficiente de paralelizar la función, sin importar si está vectorizada (como en el ejemplo anterior) o no.

Más ejemplos y una comparación de rendimiento están disponibles en GitHub. Tenga en cuenta que el paquete está en desarrollo activo, por lo que la API puede cambiar.

También tenga en cuenta que esto no funcionará automáticamente para columnas de cadena. Al usar cadenas, Swifter recurrirá a Pandas "simples" apply, que no serán paralelos. En este caso, incluso forzar su uso daskno creará mejoras de rendimiento, y sería mejor dividir su conjunto de datos manualmente y paralelizarlo usandomultiprocessing .

slhck
fuente
1
Por pura curiosidad, ¿hay alguna manera de limitar el número de núcleos que usa cuando se aplica en paralelo? Tengo un servidor compartido, así que si tomo los 32 núcleos, nadie estará contento.
Maksim Khaitovich
1
@MaximHaytovich No lo sé. Swifter usa dask en segundo plano, por lo que tal vez respete estas configuraciones: stackoverflow.com/a/40633117/435093 ; de lo contrario, recomendaría abrir un problema en GitHub. El autor es muy receptivo.
slhck
@slhck gracias! Lo cavaré un poco más. Parece que no funciona en el servidor de Windows de todos modos, simplemente se cuelga sin hacer nada en la tarea del juguete
Maksim Khaitovich
¿Pueden ayudarme a responder esto ?: - stackoverflow.com/questions/53561794/…
ak3191
2
Para cadenas, simplemente agregue allow_dask_on_strings(enable=True)así: df.swifter.allow_dask_on_strings(enable=True).apply(some_function) Fuente: github.com/jmcarpenter2/swifter/issues/45
Sumit Sidana
104

La forma más sencilla es utilizar map_partitions de Dask . Necesita estas importaciones (las necesitará pip install dask):

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import get

y la sintaxis es

data = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)

def myfunc(x,y,z, ...): return <whatever>

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)  

(Creo que 30 es un número adecuado de particiones si tiene 16 núcleos). Solo para completar, cronometré la diferencia en mi máquina (16 núcleos):

data = pd.DataFrame()
data['col1'] = np.random.normal(size = 1500000)
data['col2'] = np.random.normal(size = 1500000)

ddata = dd.from_pandas(data, npartitions=30)
def myfunc(x,y): return y*(x**2+1)
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1)
def pandas_apply(): return apply_myfunc_to_DF(data)
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get)  
def vectorized(): return myfunc(data['col1'], data['col2']  )

t_pds = timeit.Timer(lambda: pandas_apply())
print(t_pds.timeit(number=1))

28.16970546543598

t_dsk = timeit.Timer(lambda: dask_apply())
print(t_dsk.timeit(number=1))

2.708152851089835

t_vec = timeit.Timer(lambda: vectorized())
print(t_vec.timeit(number=1))

0.010668013244867325

Dando un factor de 10 aceleración desde pandas aplicar a dask aplicar en particiones. Por supuesto, si tiene una función que puede vectorizar, debería hacerlo; en este caso, la función ( y*(x**2+1)) está vectorizada trivialmente, pero hay muchas cosas que son imposibles de vectorizar.

Roko Mijic
fuente
2
Es bueno saberlo, gracias por publicar. ¿Puede explicar por qué eligió 30 particiones? ¿Cambia el rendimiento al cambiar este valor?
Andrew L
4
@AndrewL Supongo que cada partición es atendida por un proceso separado, y con 16 núcleos supongo que 16 o 32 procesos pueden ejecutarse simultáneamente. Lo probé y el rendimiento parece mejorar hasta en 32 particiones, pero los aumentos adicionales no tienen ningún efecto beneficioso. Supongo que con una máquina de cuatro núcleos querría 8 particiones, etc. Tenga en cuenta que noté una mejora entre 16 y 32, así que creo que realmente quiere 2x $ NUM_PROCESSORS
Roko Mijic
9
Lo único esThe get= keyword has been deprecated. Please use the scheduler= keyword instead with the name of the desired scheduler like 'threads' or 'processes'
wordsforthewise
6
Para dask v0.20.0 y posteriores, use ddata.map_partitions (lambda df: df.apply ((lambda row: myfunc (* row)), axis = 1)). Compute (planificador = 'procesos'), o uno de los otras opciones del programador. El código actual arroja "TypeError: la palabra clave get = ha sido eliminada. Por favor, use la palabra clave planificador = en su lugar con el nombre del planificador deseado como 'hilos' o 'procesos'"
mork
1
Asegúrese de que antes de hacer esto, el marco de datos no tenga índices duplicados mientras lanza ValueError: cannot reindex from a duplicate axis. Para evitar eso, debe eliminar los índices duplicados por df = df[~df.index.duplicated()]o restablecer sus índices por df.reset_index(inplace=True).
Habib Karbasian
24

puede probar en su pandarallellugar: una herramienta simple y eficiente para paralelizar sus operaciones de pandas en todas sus CPU (en Linux y macOS)

  • La paralelización tiene un costo (instanciar nuevos procesos, enviar datos a través de memoria compartida, etc.), por lo que la paralelización es eficiente solo si la cantidad de cálculo para paralelizar es lo suficientemente alta. Para muy poca cantidad de datos, el uso de paralelismo no siempre vale la pena.
  • Las funciones aplicadas NO deben ser funciones lambda.
from pandarallel import pandarallel
from math import sin

pandarallel.initialize()

# FORBIDDEN
df.parallel_apply(lambda x: sin(x**2), axis=1)

# ALLOWED
def func(x):
    return sin(x**2)

df.parallel_apply(func, axis=1)

ver https://github.com/nalepae/pandarallel

G_KOBELIEF
fuente
hola, no puedo resolver un problema, usando pandarallel hay un error: AttributeError: No se puede encurtir el objeto local 'prepare_worker. <locals> .closure. <locals> .wrapper'. ¿Puedes ayudarme con esto?
Alex Cam
@Alex Sry No soy el desarrollador de ese módulo. ¿Cómo son tus códigos? ¿Puede intentar declarar sus "funciones internas" como globales? (solo adivina)
G_KOBELIEF
@AlexCam Su función debe definirse fuera de otra función para que Python pueda encurtirla para multiprocesamiento
Kenan
1
@G_KOBELIEF Con Python> 3.6 podemos usar la función lambda con pandaparallel
user110244
18

Si desea permanecer en Python nativo:

import multiprocessing as mp

with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(f, df['col'])

aplicará la función fde forma paralela a la columna coldel marco de datosdf

Olivier Cruchant
fuente
Siguiendo un enfoque como este, obtuve un ValueError: Length of values does not match length of indexdesde __setitem__adentro pandas/core/frame.py. No estoy seguro de si hice algo mal o si asignar a df['newcol']no es seguro para subprocesos.
Sonajero
2
Puede escribir pool.map en una lista intermedia temp_result para permitir verificar si la longitud coincide con el df, y luego hacer un df ['newcol'] = temp_result?
Olivier Cruchant
¿Te refieres a crear la nueva columna? que usarias
Olivier Cruchant
sí, asignando el resultado del mapa a la nueva columna del marco de datos. ¿No devuelve el mapa una lista del resultado de cada fragmento enviado a la función f? Entonces, ¿qué sucede cuando asigna eso a la columna 'newcol? Usando Pandas y Python 3
Mina
¡En realidad funciona muy bien! ¿Lo intentaste? Crea una lista de la misma longitud del df, en el mismo orden que se envió. Literalmente hace c2 = f (c1) de forma paralela. No existe una forma más sencilla de realizar múltiples procesos en Python. En cuanto al rendimiento, parece que Ray también puede hacer cosas buenas ( haciadatascience.com/… ) pero no es tan maduro y la instalación no siempre funciona sin problemas en mi experiencia
Olivier Cruchant
2

Aquí hay un ejemplo de transformador base sklearn, en el que la aplicación de pandas está en paralelo

import multiprocessing as mp
from sklearn.base import TransformerMixin, BaseEstimator

class ParllelTransformer(BaseEstimator, TransformerMixin):
    def __init__(self,
                 n_jobs=1):
        """
        n_jobs - parallel jobs to run
        """
        self.variety = variety
        self.user_abbrevs = user_abbrevs
        self.n_jobs = n_jobs
    def fit(self, X, y=None):
        return self
    def transform(self, X, *_):
        X_copy = X.copy()
        cores = mp.cpu_count()
        partitions = 1

        if self.n_jobs <= -1:
            partitions = cores
        elif self.n_jobs <= 0:
            partitions = 1
        else:
            partitions = min(self.n_jobs, cores)

        if partitions == 1:
            # transform sequentially
            return X_copy.apply(self._transform_one)

        # splitting data into batches
        data_split = np.array_split(X_copy, partitions)

        pool = mp.Pool(cores)

        # Here reduce function - concationation of transformed batches
        data = pd.concat(
            pool.map(self._preprocess_part, data_split)
        )

        pool.close()
        pool.join()
        return data
    def _transform_part(self, df_part):
        return df_part.apply(self._transform_one)
    def _transform_one(self, line):
        # some kind of transformations here
        return line

para obtener más información, consulte https://towardsdatascience.com/4-easy-steps-to-improve-your-machine-learning-code-performance-88a0b0eeffa8

Maxim Balatsko
fuente
0

Para utilizar todos los núcleos (físicos o lógicos), puede probar mapplycomo alternativa a swiftery pandarallel.

Puede establecer la cantidad de núcleos (y el comportamiento de fragmentación) al iniciar:

import pandas as pd
import mapply

mapply.init(n_workers=-1)

...

df.mapply(myfunc, axis=1)

De forma predeterminada ( n_workers=-1), el paquete utiliza todas las CPU físicas disponibles en el sistema. Si su sistema usa Hyper-Threading (por lo general, aparecería el doble de la cantidad de CPU físicas), mapplygenerará un trabajador adicional para priorizar el grupo de multiprocesamiento sobre otros procesos en el sistema.

Dependiendo de su definición de all your cores, también podría usar todos los núcleos lógicos en su lugar (tenga en cuenta que, de esta manera, los procesos vinculados a la CPU lucharán por las CPU físicas, lo que podría ralentizar su operación):

import multiprocessing
n_workers = multiprocessing.cpu_count()

# or more explicit
import psutil
n_workers = psutil.cpu_count(logical=True)
ddelange
fuente