Aplicar de manera eficiente una función a un DataFrame de pandas agrupado en paralelo

89

A menudo necesito aplicar una función a los grupos de un muy grande DataFrame(de tipos de datos mixtos) y me gustaría aprovechar varios núcleos.

Puedo crear un iterador a partir de los grupos y usar el módulo de multiprocesamiento, pero no es eficiente porque cada grupo y los resultados de la función deben ser seleccionados para la mensajería entre procesos.

¿Hay alguna forma de evitar el decapado o incluso evitar la copia del DataFramecompleto? Parece que las funciones de memoria compartida de los módulos de multiprocesamiento se limitan a numpymatrices. ¿Hay más opciones?

user2303
fuente
Hasta donde yo sé, no hay forma de compartir objetos arbitrarios. Me pregunto si el decapado toma mucho más tiempo que la ganancia a través del multiprocesamiento. Tal vez debería buscar la posibilidad de crear paquetes de trabajo más grandes para cada proceso para reducir el tiempo relativo de decapado. Otra posibilidad sería utilizar el multiprocesamiento al crear los grupos.
Sebastian Werk
3
Hago algo así, pero usando UWSGI, Flask y preforking: cargo el marco de datos de pandas en un proceso, lo bifurco x veces (convirtiéndolo en un objeto de memoria compartida) y luego llamo a esos procesos desde otro proceso de Python donde concaté los resultados. atm Utilizo JSON como un proceso de comunicación, pero esto está llegando (pero aún muy experimental): pandas.pydata.org/pandas-docs/dev/io.html#msgpack-experimental
Carst
Por cierto, ¿alguna vez miraste HDF5 con fragmentación? (HDF5 no se guarda para escritura simultánea, pero también puede guardar en archivos separados y al final concatenar cosas)
Carst
7
esto será para 0.14, vea este problema: github.com/pydata/pandas/issues/5751
Jeff
4
@Jeff fue empujado a 0.15 = (
pyCthon

Respuestas:

12

De los comentarios anteriores, parece que esto está planeado por pandasalgún tiempo (también hay un rosettaproyecto de aspecto interesante que acabo de notar).

Sin embargo, hasta que se incorporen todas las funciones paralelas pandas, noté que es muy fácil escribir aumentos paralelos eficientes y sin copia de memoria para pandasusar directamente cython+ OpenMP y C ++.

Aquí hay un breve ejemplo de cómo escribir un grupo por suma paralelo, cuyo uso es algo como esto:

import pandas as pd
import para_group_demo

df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})
print para_group_demo.sum(df.a, df.b)

y la salida es:

     sum
key     
0      6
1      11
2      4

Nota Sin duda, la funcionalidad de este ejemplo simple eventualmente será parte de pandas. Algunas cosas, sin embargo, será más natural para paralelizar en C ++ durante algún tiempo, y es importante ser consciente de lo fácil que es combinar esto en pandas.


Para hacer esto, escribí una extensión simple de archivo de fuente única cuyo código sigue.

Comienza con algunas importaciones y definiciones de tipos.

from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map

cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange

import pandas as pd

ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t

El unordered_maptipo C ++ es para sumar por un solo hilo y el vectores para sumar por todos los hilos.

Ahora a la función sum. Comienza con vistas de memoria escritas para un acceso rápido:

def sum(crit, vals):
    cdef int64_t[:] crit_view = crit.values
    cdef int64_t[:] vals_view = vals.values

La función continúa dividiendo el semi-igualmente a los subprocesos (aquí codificado a 4), y haciendo que cada subproceso sume las entradas en su rango:

    cdef uint64_t num_threads = 4
    cdef uint64_t l = len(crit)
    cdef uint64_t s = l / num_threads + 1
    cdef uint64_t i, j, e
    cdef counts_vec_t counts
    counts = counts_vec_t(num_threads)
    counts.resize(num_threads)
    with cython.boundscheck(False):
        for i in prange(num_threads, nogil=True): 
            j = i * s
            e = j + s
            if e > l:
                e = l
            while j < e:
                counts[i][crit_view[j]] += vals_view[j]
                inc(j)

Cuando los hilos se han completado, la función fusiona todos los resultados (de los diferentes rangos) en uno solo unordered_map:

    cdef counts_t total
    cdef counts_it_t it, e_it
    for i in range(num_threads):
        it = counts[i].begin()
        e_it = counts[i].end()
        while it != e_it:
            total[deref(it).first] += deref(it).second
            inc(it)        

Todo lo que queda es crear un DataFramey devolver los resultados:

    key, sum_ = [], []
    it = total.begin()
    e_it = total.end()
    while it != e_it:
        key.append(deref(it).first)
        sum_.append(deref(it).second)
        inc(it)

    df = pd.DataFrame({'key': key, 'sum': sum_})
    df.set_index('key', inplace=True)
    return df
Ami Tavory
fuente