Indicador de progreso durante las operaciones de pandas

158

Regularmente realizo operaciones de pandas en marcos de datos de más de 15 millones de filas y me encantaría tener acceso a un indicador de progreso para operaciones particulares.

¿Existe un indicador de progreso basado en texto para las operaciones de división, aplicación y combinación de pandas?

Por ejemplo, en algo como:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

donde feature_rollupes una función algo complicada que toma muchas columnas DF y crea nuevas columnas de usuario a través de varios métodos. Estas operaciones pueden tomar un tiempo para grandes marcos de datos, por lo que me gustaría saber si es posible tener una salida basada en texto en un cuaderno iPython que me actualice sobre el progreso.

Hasta ahora, he probado los indicadores de progreso de bucle canónico para Python, pero no interactúan con los pandas de ninguna manera significativa.

Espero que haya algo que he pasado por alto en la biblioteca / documentación de pandas que le permite a uno saber el progreso de una combinación de aplicación dividida. Una implementación simple podría considerar el número total de subconjuntos de marcos de datos sobre los que applyfunciona la función e informar el progreso como la fracción completa de esos subconjuntos.

¿Es esto quizás algo que debe agregarse a la biblioteca?

cwharland
fuente
¿Has hecho un% poda (perfil) en el código? a veces puedes hacer operaciones en todo el marco antes de aplicar para eliminar los cuellos de botella
Jeff
@Jeff: apuesto a que lo hice antes para exprimir hasta el último rendimiento. El problema realmente se reduce al límite de pseudo-reducción de mapa en el que estoy trabajando, ya que las filas están en decenas de millones, por lo que no espero aumentos de súper velocidad, solo quiero algunos comentarios sobre el progreso.
cwharland
Considere cythonising: pandas.pydata.org/pandas-docs/dev/…
Andy Hayden
@AndyHayden: como comenté en su respuesta, su implementación es bastante buena y agrega una pequeña cantidad de tiempo al trabajo general. También hice cythonized tres operaciones dentro del paquete acumulativo de características que recuperó todo el tiempo que ahora se dedica a informar el progreso. Entonces, al final apuesto a que tendré barras de progreso con una reducción en el tiempo de procesamiento total si sigo con cython en toda la función.
cwharland

Respuestas:

277

Debido a la demanda popular, tqdmha agregado soporte para pandas. A diferencia de las otras respuestas, esto no ralentizará notablemente a los pandas : aquí hay un ejemplo para DataFrameGroupBy.progress_apply:

import pandas as pd
import numpy as np
from tqdm import tqdm
# from tqdm.auto import tqdm  # for notebooks

df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))

# Create and register a new `tqdm` instance with `pandas`
# (can use tqdm_gui, optional kwargs, etc.)
tqdm.pandas()

# Now you can use `progress_apply` instead of `apply`
df.groupby(0).progress_apply(lambda x: x**2)

En caso de que esté interesado en cómo funciona esto (y cómo modificarlo para sus propias devoluciones de llamada), vea los ejemplos en github , la documentación completa en pypi , o importe el módulo y ejecútelo help(tqdm).

EDITAR


Para responder directamente a la pregunta original, reemplace:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

con:

from tqdm import tqdm
tqdm.pandas()
df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup)

Nota: tqdm <= v4.8 : para versiones de tqdm por debajo de 4.8, en lugar de lo tqdm.pandas()que tenía que hacer:

from tqdm import tqdm, tqdm_pandas
tqdm_pandas(tqdm())
casper.dcl
fuente
55
tqdmen realidad fue creado para iterables acaba de civil, país de origen: from tqdm import tqdm; for i in tqdm( range(int(1e8)) ): passEl apoyo pandas era un corte reciente que hice :)
casper.dcl
66
Por cierto, si usa los cuadernos Jupyter, también puede usar tqdm_notebooks para obtener una barra más bonita. Junto con los pandas que le Actualmente necesario crear una instancia que al igual que from tqdm import tqdm_notebook; tqdm_notebook().pandas(*args, **kwargs) ver aquí
grinsbaeckchen
2
A partir de la versión 4.8.1, use tqdm.pandas () en su lugar. github.com/tqdm/tqdm/commit/…
mork
1
Gracias, @mork es correcto. Estamos trabajando (lentamente) hacia tqdmv5, lo que hace que las cosas estén más modularizadas.
casper.dcl
1
Para una recomendación de sintaxis reciente, consulte la documentación de tqdm Pandas aquí: pypi.python.org/pypi/tqdm#pandas-integration
Manu CJ
18

Para ajustar la respuesta de Jeff (y tener esto como una función reutilizable).

def logged_apply(g, func, *args, **kwargs):
    step_percentage = 100. / len(g)
    import sys
    sys.stdout.write('apply progress:   0%')
    sys.stdout.flush()

    def logging_decorator(func):
        def wrapper(*args, **kwargs):
            progress = wrapper.count * step_percentage
            sys.stdout.write('\033[D \033[D' * 4 + format(progress, '3.0f') + '%')
            sys.stdout.flush()
            wrapper.count += 1
            return func(*args, **kwargs)
        wrapper.count = 0
        return wrapper

    logged_func = logging_decorator(func)
    res = g.apply(logged_func, *args, **kwargs)
    sys.stdout.write('\033[D \033[D' * 4 + format(100., '3.0f') + '%' + '\n')
    sys.stdout.flush()
    return res

Nota: el porcentaje de progreso de la aplicación se actualiza en línea . Si su función stdouts entonces esto no funcionará.

In [11]: g = df_users.groupby(['userID', 'requestDate'])

In [12]: f = feature_rollup

In [13]: logged_apply(g, f)
apply progress: 100%
Out[13]: 
...

Como de costumbre, puede agregar esto a sus objetos groupby como método:

from pandas.core.groupby import DataFrameGroupBy
DataFrameGroupBy.logged_apply = logged_apply

In [21]: g.logged_apply(f)
apply progress: 100%
Out[21]: 
...

Como se menciona en los comentarios, esta no es una característica que los pandas centrales estarían interesados ​​en implementar. Pero Python le permite crear estos para muchos objetos / métodos de pandas (hacerlo sería bastante trabajo ... aunque debería poder generalizar este enfoque).

Andy Hayden
fuente
Digo "bastante trabajo", pero probablemente podría reescribir toda esta función como decorador (más general).
Andy Hayden
Gracias por ampliar la publicación de Jeff. He implementado ambos y la desaceleración para cada uno es bastante mínima (agregué un total de 1.1 minutos a una operación que tardó 27 minutos en completarse). De esta manera puedo ver el progreso y dada la naturaleza ad hoc de estas operaciones, creo que esta es una desaceleración aceptable.
cwharland
Excelente, me alegro de que haya ayudado. De hecho, me sorprendió la desaceleración (cuando probé un ejemplo), esperaba que fuera mucho peor.
Andy Hayden
1
Para aumentar aún más la eficacia de los métodos publicados, estaba siendo flojo con respecto a la importación de datos (¡los pandas son demasiado buenos para manejar csv desordenado!) Y algunas de mis entradas (~ 1%) habían eliminado completamente las inserciones (piense en conjunto registros insertados en campos individuales). La eliminación de estos causa una aceleración masiva en el resumen de características ya que no había ambigüedad sobre qué hacer durante las operaciones de división, aplicación y combinación.
cwharland
1
Tengo menos de 8 minutos ... pero agregué algunas cosas al paquete acumulativo de funciones (más funciones -> ¡mejor AUC!). Estos 8 minutos son por trozo (dos trozos en total en este momento) con cada trozo en el vecindario de 12 millones de filas. Entonces sí ... 16 minutos para realizar operaciones considerables en 24 millones de filas usando HDFStore (y hay cosas nltk en el paquete acumulativo de funciones). Bastante bien. Esperemos que Internet no me juzgue por la ignorancia inicial o la ambivalencia hacia las inserciones desordenadas =)
cwharland 05 de
11

En caso de que necesite soporte sobre cómo usar esto en un cuaderno Jupyter / ipython, como lo hice, aquí hay una guía útil y una fuente del artículo relevante :

from tqdm._tqdm_notebook import tqdm_notebook
import pandas as pd
tqdm_notebook.pandas()
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
df.groupby(0).progress_apply(lambda x: x**2)

Tenga en cuenta el guión bajo en la declaración de importación para _tqdm_notebook. Como se menciona en el artículo mencionado, el desarrollo se encuentra en una etapa beta tardía.

Victor Vulovic
fuente
8

Para cualquiera que esté buscando aplicar tqdm en su código personalizado de aplicación de pandas paralelo.

(Probé algunas de las bibliotecas para la paralelización a lo largo de los años, pero nunca encontré una solución de paralelización al 100%, principalmente para la función de aplicación, y siempre tuve que volver por mi código "manual").

df_multi_core : este es el que usted llama. Acepta:

  1. Tu objeto df
  2. El nombre de la función que le gustaría llamar
  3. El subconjunto de columnas sobre el que se puede realizar la función (ayuda a reducir el tiempo / memoria)
  4. El número de trabajos para ejecutar en paralelo (-1 u omitir para todos los núcleos)
  5. Cualquier otro kwargs que acepte la función de df (como "axis")

_df_split : esta es una función auxiliar interna que debe colocarse globalmente en el módulo en ejecución (Pool.map es "dependiente de la ubicación"), de lo contrario la ubicaría internamente.

Aquí está el código de mi esencia ( agregaré más pruebas de función de pandas allí):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

A continuación se muestra un código de prueba para una aplicación paralela con tqdm "progress_apply".

from time import time
from tqdm import tqdm
tqdm.pandas()

if __name__ == '__main__': 
    sep = '-' * 50

    # tqdm progress_apply test      
    def apply_f(row):
        return row['c1'] + 0.1
    N = 1000000
    np.random.seed(0)
    df = pd.DataFrame({'c1': np.arange(N), 'c2': np.arange(N)})

    print('testing pandas apply on {}\n{}'.format(df.shape, sep))
    t1 = time()
    res = df.progress_apply(apply_f, axis=1)
    t2 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    # res = df_multi_core(df=df, df_f_name='apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    res = df_multi_core(df=df, df_f_name='progress_apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    t4 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))

En la salida puede ver 1 barra de progreso para ejecutar sin paralelización y barras de progreso por núcleo cuando se ejecuta con paralelización. Hay una ligera interrupción y, a veces, el resto de los núcleos aparecen a la vez, pero incluso entonces creo que es útil ya que obtienes las estadísticas de progreso por núcleo (it / sec y registros totales, por ejemplo)

ingrese la descripción de la imagen aquí

¡Gracias @abcdaa por esta gran biblioteca!

mork
fuente
1
Gracias @mork: siéntase libre de agregar a github.com/tqdm/tqdm/wiki/How-to-make-a-great-Progress-Bar o cree una nueva página en github.com/tqdm/tqdm/wiki
casper. dcl
Gracias, pero tuve que cambiar estas partes: try: splits = np.array_split(df[subset], njobs) except ValueError: splits = np.array_split(df, njobs)debido a la excepción KeyError en lugar de ValueError, cambie a Exception para manejar todos los casos.
Marius
Gracias @mork: esta respuesta debería ser mayor.
Andy
5

Puedes hacerlo fácilmente con un decorador

from functools import wraps 

def logging_decorator(func):

    @wraps
    def wrapper(*args, **kwargs):
        wrapper.count += 1
        print "The function I modify has been called {0} times(s).".format(
              wrapper.count)
        func(*args, **kwargs)
    wrapper.count = 0
    return wrapper

modified_function = logging_decorator(feature_rollup)

luego use la función modified_function (y cambie cuando quiera que se imprima)

Jeff
fuente
1
¡La advertencia obvia es que esto ralentizará su función! Incluso podría actualizarlo con el progreso stackoverflow.com/questions/5426546/… por ejemplo, contar / len como porcentaje.
Andy Hayden
sí, tendrá orden (número de grupos), por lo que, dependiendo de cuál sea su cuello de botella, esto podría marcar la diferencia
Jeff
Quizás lo más intuitivo es envolver esto en una logged_apply(g, func)función, donde tendría acceso al pedido, y podría iniciar sesión desde el principio.
Andy Hayden
Hice lo anterior en mi respuesta, también actualización de porcentaje descarado. En realidad, no pude hacer que el tuyo funcionara ... Creo que con la parte de la envoltura. Si lo está utilizando para la aplicación, no es tan importante de todos modos.
Andy Hayden
1

He cambiado la respuesta de Jeff , para incluir un total, de modo que pueda seguir el progreso y una variable para imprimir cada X iteraciones (esto en realidad mejora mucho el rendimiento, si el "print_at" es razonablemente alto)

def count_wrapper(func,total, print_at):

    def wrapper(*args):
        wrapper.count += 1
        if wrapper.count % wrapper.print_at == 0:
            clear_output()
            sys.stdout.write( "%d / %d"%(calc_time.count,calc_time.total) )
            sys.stdout.flush()
        return func(*args)
    wrapper.count = 0
    wrapper.total = total
    wrapper.print_at = print_at

    return wrapper

la función clear_output () es de

from IPython.core.display import clear_output

si no está en IPython, la respuesta de Andy Hayden lo hace sin ella

Filipe Silva
fuente