¿Hay una manera directa de ejecutar pandas.DataFrame.isin en paralelo?

25

Tengo un programa de modelado y puntaje que hace un uso intensivo de la DataFrame.isinfunción de los pandas, buscando en las listas de Facebook "me gusta" registros de usuarios individuales para cada uno de unos pocos miles de páginas específicas. Esta es la parte del programa que consume más tiempo, más que las piezas de modelado o puntuación, simplemente porque solo se ejecuta en un núcleo, mientras que el resto se ejecuta en unas pocas docenas simultáneamente.

Aunque sé que podría dividir manualmente el marco de datos en fragmentos y ejecutar la operación en paralelo, ¿hay alguna forma directa de hacerlo automáticamente? En otras palabras, ¿hay algún tipo de paquete que reconozca que estoy ejecutando una operación fácilmente delegada y la distribuya automáticamente? Quizás eso es pedir demasiado, pero en el pasado ya me sorprendió lo que ya está disponible en Python, así que creo que vale la pena preguntar.

Cualquier otra sugerencia sobre cómo se podría lograr esto (¡incluso si no fuera por un paquete mágico de unicornio!) También sería apreciada. Principalmente, solo tratando de encontrar una manera de ahorrar 15-20 minutos por corrida sin pasar la misma cantidad de tiempo codificando la solución.

Therriault
fuente
¿Qué tan grande es su lista de valores? ¿Has tratado de pasarlo como un conjunto? Para el paralelismo, puede estar interesado en Joblib. Es fácil de usar y puede acelerar los cálculos. Úselo con grandes fragmentos de datos.
oo
Otra opción es replantear su problema como una unión. Las uniones son mucho más rápidas en Pandas stackoverflow.com/questions/23945493/…
Brian Spiering
Otra opción es usar np.in1d, que también es más rápido stackoverflow.com/questions/21738882/fast-pandas-filtering
Brian Spiering

Respuestas:

8

Desafortunadamente, la paralelización aún no se implementa en los pandas. Puede unirse a este problema de Github si desea participar en el desarrollo de esta función.

No conozco ningún "paquete de unicornio mágico" para este propósito, así que lo mejor será escribir tu propia solución. Pero si aún no desea dedicar tiempo a eso y desea aprender algo nuevo, puede probar los dos métodos integrados en MongoDB (reducción de mapas y marco agg). Ver mongodb_agg_framework .

Stanpol
fuente
0

Hay una versión más común de esta pregunta con respecto a la paralelización en pandas función de aplicación de, así que esta es una pregunta refrescante :)

Primero , quiero mencionar más rápido ya que solicitó una solución "empaquetada", y aparece en la mayoría de las preguntas SO con respecto a la paralelización de pandas.

Pero.. todavía me gustaría compartir mi código personal para ello, ya que después de varios años de trabajar con DataFrame nunca encontré una solución de paralelización al 100% (principalmente para la función de aplicación) y siempre tuve que volver por mi " manual "código.

Gracias a usted, lo hice más genérico para admitir cualquier método DataFrame (teóricamente) por su nombre (por lo que no tendrá que mantener versiones para isin, apply, etc.).

Lo probé en las funciones "isin", "apply" e "isna" usando Python 2.7 y 3.6. Tiene menos de 20 líneas, y seguí la convención de nomenclatura de pandas como "subconjunto" y "njobs".

También agregué una comparación de tiempo con el código equivalente dask para "isin" y parece ~ X2 veces más lento que esta esencia.

Incluye 2 funciones:

df_multi_core : este es el que usted llama. Acepta:

  1. Tu objeto df
  2. El nombre de la función que le gustaría llamar
  3. El subconjunto de columnas sobre el que se puede realizar la función (ayuda a reducir el tiempo / memoria)
  4. El número de trabajos a ejecutar en paralelo (-1 u omitir para todos los núcleos)
  5. Cualquier otro kwargs que acepte la función de df (como "axis")

_df_split : esta es una función auxiliar interna que debe colocarse globalmente en el módulo en ejecución (Pool.map es "dependiente de la ubicación"), de lo contrario, la ubicaría internamente.

Aquí está el código de mi esencia ( agregaré más pruebas de función de pandas allí):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

A continuación, se muestra un código de prueba para un isin paralelo , que compara el rendimiento nativo, multinúcleo y básico. En una máquina I7 con 8 núcleos físicos, obtuve alrededor de X4 veces la aceleración. ¡Me encantaría saber lo que obtienes de tus datos reales!

from time import time

if __name__ == '__main__': 
    sep = '-' * 50

    # isin test
    N = 10000000
    df = pd.DataFrame({'c1': np.random.randint(low=1, high=N, size=N), 'c2': np.arange(N)})
    lookfor = np.random.randint(low=1, high=N, size=1000000)

    print('{}\ntesting pandas isin on {}\n{}'.format(sep, df.shape, sep))
    t1 = time()
    print('result\n{}'.format(df.isin(lookfor).sum()))
    t2 = time()
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    res = df_multi_core(df=df, df_f_name='isin', subset=['c1'], njobs=-1, values=lookfor)
    print('result\n{}'.format(res.sum()))
    t4 = time()
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))


    t5 = time()
    ddata = dd.from_pandas(df, npartitions=njobs)
    res = ddata.map_partitions(lambda df: df.apply(apply_f, axis=1)).compute(scheduler='processes')
    t6 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for dask implementation {}\n{}'.format(round(t6 - t5, 2), sep))

--------------------------------------------------
testing pandas isin on (10000000, 2)
--------------------------------------------------
result
c1    953213
c2    951942
dtype: int64
time for native implementation 3.87
--------------------------------------------------
result
c1    953213
dtype: int64
time for multi core implementation 1.16
--------------------------------------------------
result
c1    953213
c2    951942
dtype: int64
time for dask implementation 2.88
mork
fuente
@Therriault Agregué una comparación de Dask isin- parece que el fragmento de código es más efectivo con 'isin' - ~ X1.75 veces más rápido que Dask (en comparación con la applyfunción que solo obtuvo un 5% más rápido que Dask)
mork