Puede utilizar el swifter
paquete:
pip install swifter
Funciona como un complemento para pandas, lo que le permite reutilizar la apply
función:
import swifter
def some_function(data):
return data * 10
data['out'] = data['in'].swifter.apply(some_function)
Descubrirá automáticamente la forma más eficiente de paralelizar la función, sin importar si está vectorizada (como en el ejemplo anterior) o no.
Más ejemplos y una comparación de rendimiento están disponibles en GitHub. Tenga en cuenta que el paquete está en desarrollo activo, por lo que la API puede cambiar.
También tenga en cuenta que esto no funcionará automáticamente para columnas de cadena. Al usar cadenas, Swifter recurrirá a Pandas "simples" apply
, que no serán paralelos. En este caso, incluso forzar su uso dask
no creará mejoras de rendimiento, y sería mejor dividir su conjunto de datos manualmente y paralelizarlo usandomultiprocessing
.
allow_dask_on_strings(enable=True)
así:df.swifter.allow_dask_on_strings(enable=True).apply(some_function)
Fuente: github.com/jmcarpenter2/swifter/issues/45La forma más sencilla es utilizar map_partitions de Dask . Necesita estas importaciones (las necesitará
pip install dask
):import pandas as pd import dask.dataframe as dd from dask.multiprocessing import get
y la sintaxis es
data = <your_pandas_dataframe> ddata = dd.from_pandas(data, npartitions=30) def myfunc(x,y,z, ...): return <whatever> res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get)
(Creo que 30 es un número adecuado de particiones si tiene 16 núcleos). Solo para completar, cronometré la diferencia en mi máquina (16 núcleos):
data = pd.DataFrame() data['col1'] = np.random.normal(size = 1500000) data['col2'] = np.random.normal(size = 1500000) ddata = dd.from_pandas(data, npartitions=30) def myfunc(x,y): return y*(x**2+1) def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1) def pandas_apply(): return apply_myfunc_to_DF(data) def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get) def vectorized(): return myfunc(data['col1'], data['col2'] ) t_pds = timeit.Timer(lambda: pandas_apply()) print(t_pds.timeit(number=1))
t_dsk = timeit.Timer(lambda: dask_apply()) print(t_dsk.timeit(number=1))
t_vec = timeit.Timer(lambda: vectorized()) print(t_vec.timeit(number=1))
Dando un factor de 10 aceleración desde pandas aplicar a dask aplicar en particiones. Por supuesto, si tiene una función que puede vectorizar, debería hacerlo; en este caso, la función (
y*(x**2+1)
) está vectorizada trivialmente, pero hay muchas cosas que son imposibles de vectorizar.fuente
The get= keyword has been deprecated. Please use the scheduler= keyword instead with the name of the desired scheduler like 'threads' or 'processes'
ValueError: cannot reindex from a duplicate axis
. Para evitar eso, debe eliminar los índices duplicados pordf = df[~df.index.duplicated()]
o restablecer sus índices pordf.reset_index(inplace=True)
.puede probar en su
pandarallel
lugar: una herramienta simple y eficiente para paralelizar sus operaciones de pandas en todas sus CPU (en Linux y macOS)from pandarallel import pandarallel from math import sin pandarallel.initialize() # FORBIDDEN df.parallel_apply(lambda x: sin(x**2), axis=1) # ALLOWED def func(x): return sin(x**2) df.parallel_apply(func, axis=1)
ver https://github.com/nalepae/pandarallel
fuente
Si desea permanecer en Python nativo:
import multiprocessing as mp with mp.Pool(mp.cpu_count()) as pool: df['newcol'] = pool.map(f, df['col'])
aplicará la función
f
de forma paralela a la columnacol
del marco de datosdf
fuente
ValueError: Length of values does not match length of index
desde__setitem__
adentropandas/core/frame.py
. No estoy seguro de si hice algo mal o si asignar adf['newcol']
no es seguro para subprocesos.Aquí hay un ejemplo de transformador base sklearn, en el que la aplicación de pandas está en paralelo
import multiprocessing as mp from sklearn.base import TransformerMixin, BaseEstimator class ParllelTransformer(BaseEstimator, TransformerMixin): def __init__(self, n_jobs=1): """ n_jobs - parallel jobs to run """ self.variety = variety self.user_abbrevs = user_abbrevs self.n_jobs = n_jobs def fit(self, X, y=None): return self def transform(self, X, *_): X_copy = X.copy() cores = mp.cpu_count() partitions = 1 if self.n_jobs <= -1: partitions = cores elif self.n_jobs <= 0: partitions = 1 else: partitions = min(self.n_jobs, cores) if partitions == 1: # transform sequentially return X_copy.apply(self._transform_one) # splitting data into batches data_split = np.array_split(X_copy, partitions) pool = mp.Pool(cores) # Here reduce function - concationation of transformed batches data = pd.concat( pool.map(self._preprocess_part, data_split) ) pool.close() pool.join() return data def _transform_part(self, df_part): return df_part.apply(self._transform_one) def _transform_one(self, line): # some kind of transformations here return line
para obtener más información, consulte https://towardsdatascience.com/4-easy-steps-to-improve-your-machine-learning-code-performance-88a0b0eeffa8
fuente
Para utilizar todos los núcleos (físicos o lógicos), puede probar
mapply
como alternativa aswifter
ypandarallel
.Puede establecer la cantidad de núcleos (y el comportamiento de fragmentación) al iniciar:
import pandas as pd import mapply mapply.init(n_workers=-1) ... df.mapply(myfunc, axis=1)
De forma predeterminada (
n_workers=-1
), el paquete utiliza todas las CPU físicas disponibles en el sistema. Si su sistema usa Hyper-Threading (por lo general, aparecería el doble de la cantidad de CPU físicas),mapply
generará un trabajador adicional para priorizar el grupo de multiprocesamiento sobre otros procesos en el sistema.Dependiendo de su definición de
all your cores
, también podría usar todos los núcleos lógicos en su lugar (tenga en cuenta que, de esta manera, los procesos vinculados a la CPU lucharán por las CPU físicas, lo que podría ralentizar su operación):import multiprocessing n_workers = multiprocessing.cpu_count() # or more explicit import psutil n_workers = psutil.cpu_count(logical=True)
fuente