A menudo necesito aplicar una función a los grupos de un muy grande DataFrame
(de tipos de datos mixtos) y me gustaría aprovechar varios núcleos.
Puedo crear un iterador a partir de los grupos y usar el módulo de multiprocesamiento, pero no es eficiente porque cada grupo y los resultados de la función deben ser seleccionados para la mensajería entre procesos.
¿Hay alguna forma de evitar el decapado o incluso evitar la copia del DataFrame
completo? Parece que las funciones de memoria compartida de los módulos de multiprocesamiento se limitan a numpy
matrices. ¿Hay más opciones?
python
pandas
multiprocessing
shared-memory
user2303
fuente
fuente
Respuestas:
De los comentarios anteriores, parece que esto está planeado por
pandas
algún tiempo (también hay unrosetta
proyecto de aspecto interesante que acabo de notar).Sin embargo, hasta que se incorporen todas las funciones paralelas
pandas
, noté que es muy fácil escribir aumentos paralelos eficientes y sin copia de memoria parapandas
usar directamentecython
+ OpenMP y C ++.Aquí hay un breve ejemplo de cómo escribir un grupo por suma paralelo, cuyo uso es algo como esto:
import pandas as pd import para_group_demo df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)}) print para_group_demo.sum(df.a, df.b)
y la salida es:
sum key 0 6 1 11 2 4
Nota Sin duda, la funcionalidad de este ejemplo simple eventualmente será parte de
pandas
. Algunas cosas, sin embargo, será más natural para paralelizar en C ++ durante algún tiempo, y es importante ser consciente de lo fácil que es combinar esto enpandas
.Para hacer esto, escribí una extensión simple de archivo de fuente única cuyo código sigue.
Comienza con algunas importaciones y definiciones de tipos.
from libc.stdint cimport int64_t, uint64_t from libcpp.vector cimport vector from libcpp.unordered_map cimport unordered_map cimport cython from cython.operator cimport dereference as deref, preincrement as inc from cython.parallel import prange import pandas as pd ctypedef unordered_map[int64_t, uint64_t] counts_t ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t ctypedef vector[counts_t] counts_vec_t
El
unordered_map
tipo C ++ es para sumar por un solo hilo y elvector
es para sumar por todos los hilos.Ahora a la función
sum
. Comienza con vistas de memoria escritas para un acceso rápido:def sum(crit, vals): cdef int64_t[:] crit_view = crit.values cdef int64_t[:] vals_view = vals.values
La función continúa dividiendo el semi-igualmente a los subprocesos (aquí codificado a 4), y haciendo que cada subproceso sume las entradas en su rango:
cdef uint64_t num_threads = 4 cdef uint64_t l = len(crit) cdef uint64_t s = l / num_threads + 1 cdef uint64_t i, j, e cdef counts_vec_t counts counts = counts_vec_t(num_threads) counts.resize(num_threads) with cython.boundscheck(False): for i in prange(num_threads, nogil=True): j = i * s e = j + s if e > l: e = l while j < e: counts[i][crit_view[j]] += vals_view[j] inc(j)
Cuando los hilos se han completado, la función fusiona todos los resultados (de los diferentes rangos) en uno solo
unordered_map
:cdef counts_t total cdef counts_it_t it, e_it for i in range(num_threads): it = counts[i].begin() e_it = counts[i].end() while it != e_it: total[deref(it).first] += deref(it).second inc(it)
Todo lo que queda es crear un
DataFrame
y devolver los resultados:key, sum_ = [], [] it = total.begin() e_it = total.end() while it != e_it: key.append(deref(it).first) sum_.append(deref(it).second) inc(it) df = pd.DataFrame({'key': key, 'sum': sum_}) df.set_index('key', inplace=True) return df
fuente