Entonces, digamos que tiene un proceso de Python que está recopilando datos en tiempo real con alrededor de 500 filas por segundo (esto se puede paralelizar aún más para reducir a alrededor de 50 ps) de un sistema de colas y agregarlo a DataFrame
:
rq = MyRedisQueue(..)
df = pd.DataFrame()
while 1:
recv = rq.get(block=True)
# some converting
df.append(recv, ignore_index = True)
Ahora la pregunta es: ¿Cómo utilizar las CPU basadas en estos datos? Por lo tanto, soy plenamente consciente de las limitaciones de GIL , y analicé el espacio de nombres del Administrador de multiprocesamiento , también aquí , pero parece que hay algunos inconvenientes con respecto a la latencia en el marco de datos de retención centenaria . Antes de profundizar en él, también probé pool.map
lo que reconocí aplicar pickle
entre los procesos, que es una forma lenta y tiene demasiada sobrecarga.
Entonces, después de todo esto, finalmente me pregunto cómo (si) se puede transferir una inserción de 500 filas por segundo (o incluso 50 filas por segundo) a diferentes procesos con algo de tiempo de CPU para aplicar estadísticas y heurísticas en los datos del niño procesos?
¿Quizás sería mejor implementar un socket tcp personalizado o un sistema de colas entre los dos procesos? ¿O hay algunas implementaciones en pandas
otras bibliotecas para permitir realmente un acceso rápido al gran marco de datos en el proceso principal ? ¡Amo los pandas!
Respuestas:
Antes de comenzar, debo decir que no nos contó mucho acerca de su código, pero tenga en mente este punto de transferir solo esas 50/500 filas nuevas cada segundo al proceso secundario e intentar crear ese gran
DataFrame
proceso secundario.Estoy trabajando en un proyecto exactamente igual que tú. Python obtuvo muchas implementaciones de IPC como
Pipe
yQueue
como usted sabe.Shared Memory
La solución puede ser problemática en muchos casos, la documentación oficial de AFAIK python advirtió sobre el uso de memorias compartidas.En mi experiencia, la mejor manera de transformar datos entre solo dos procesos es
Pipe
, por lo que puede seleccionar DataFrame y enviarlo al otro punto final de la conexión. Le sugiero que evite losTCP
sockets (AF_INET
) en su caso.Los pandas
DataFrame
no pueden transformarse en otro proceso sin ser encurtidos y sin espinas. así que también te recomiendo que transfieras datos sin procesar como tipos integrados como endict
lugar de DataFrame. Esto podría hacer que el encurtido y el desempaque sean más rápidos y también tenga menos huellas de memoria.fuente
Shared Memory
área, que es de esperar que pueda manejar una gran cantidad de procesos de lectura de los procesos secundarios, mientras que los procesos principales se agregan a ella, podría hacerlo por lo que veo, si apenas restrinjo los accesos de escritura al proceso principal.shared memory
está en algún tipo deblock state
escritura? Eso significaría que los procesos secundarios no pueden leer el DataFrame, mientras que el proceso primario se agrega a él (lo cual será casi siempre).Shared Memory
solución, debe sincronizar los procesos secundarios con el proceso del proveedor. Puede hacerlo a través demultiprocessing.Lock
: docs.python.org/3/library/…La paralelización en
pandas
es probablemente mejor manejada por otro motor por completo.Eche un vistazo al proyecto Koalas de Databricks o el DataFrame de Dask .
fuente
df.map_partitions
) y luegogroupby
indexe (importante para el rendimiento en Dask), guarde como CSV.Una solución simple sería separar el proceso en dos etapas diferentes. Use Asyncio para recibir los datos de manera no bloqueadora y realizar sus transformaciones dentro de eso. La segunda etapa consumiría una cola Asyncio para construir el DataFrame. Esto supone que no necesita el DataFrame disponible para un proceso diferente mientras recibe datos de Redis Queue.
Aquí hay un ejemplo de cómo construir un modelo de productor / consumidor con Asyncio
fuente