Actualice un DataFrame en diferentes procesos de Python en tiempo real

8

Entonces, digamos que tiene un proceso de Python que está recopilando datos en tiempo real con alrededor de 500 filas por segundo (esto se puede paralelizar aún más para reducir a alrededor de 50 ps) de un sistema de colas y agregarlo a DataFrame:

rq = MyRedisQueue(..)
df = pd.DataFrame()
while 1:
    recv = rq.get(block=True)
    # some converting
    df.append(recv, ignore_index = True)

Ahora la pregunta es: ¿Cómo utilizar las CPU basadas en estos datos? Por lo tanto, soy plenamente consciente de las limitaciones de GIL , y analicé el espacio de nombres del Administrador de multiprocesamiento , también aquí , pero parece que hay algunos inconvenientes con respecto a la latencia en el marco de datos de retención centenaria . Antes de profundizar en él, también probé pool.maplo que reconocí aplicar pickleentre los procesos, que es una forma lenta y tiene demasiada sobrecarga.

Entonces, después de todo esto, finalmente me pregunto cómo (si) se puede transferir una inserción de 500 filas por segundo (o incluso 50 filas por segundo) a diferentes procesos con algo de tiempo de CPU para aplicar estadísticas y heurísticas en los datos del niño procesos?

¿Quizás sería mejor implementar un socket tcp personalizado o un sistema de colas entre los dos procesos? ¿O hay algunas implementaciones en pandasotras bibliotecas para permitir realmente un acceso rápido al gran marco de datos en el proceso principal ? ¡Amo los pandas!

gies0r
fuente
¿Desea realizar las estadísticas solo en los fragmentos de 50 a 500 filas que son nuevas cada segundo y agregarlas continuamente a un gran DF? ¿El gran DF debe almacenarse o necesita más procesamiento en tiempo real para realizarlo?
Ronald Luc
@RonaldLuc Si es un requisito necesario, lo limitaría a las estadísticas de las nuevas 50 a 500 filas, sí. Podría mantener medias y altas / bajas en variables adicionales para realizar un seguimiento de los datos existentes en el gran DataFrame.
gies0r

Respuestas:

4

Antes de comenzar, debo decir que no nos contó mucho acerca de su código, pero tenga en mente este punto de transferir solo esas 50/500 filas nuevas cada segundo al proceso secundario e intentar crear ese gran DataFrameproceso secundario.

Estoy trabajando en un proyecto exactamente igual que tú. Python obtuvo muchas implementaciones de IPC como Pipey Queuecomo usted sabe. Shared MemoryLa solución puede ser problemática en muchos casos, la documentación oficial de AFAIK python advirtió sobre el uso de memorias compartidas.

En mi experiencia, la mejor manera de transformar datos entre solo dos procesos es Pipe, por lo que puede seleccionar DataFrame y enviarlo al otro punto final de la conexión. Le sugiero que evite los TCPsockets ( AF_INET) en su caso.

Los pandas DataFrameno pueden transformarse en otro proceso sin ser encurtidos y sin espinas. así que también te recomiendo que transfieras datos sin procesar como tipos integrados como en dictlugar de DataFrame. Esto podría hacer que el encurtido y el desempaque sean más rápidos y también tenga menos huellas de memoria.

AmirHmZ
fuente
Agradezco tu respuesta @AmirHmZ! Especialmente el enlace al punto de referencia es bueno (y las herramientas que lo rodean, que aún no conocía). Una solución en el Shared Memoryárea, que es de esperar que pueda manejar una gran cantidad de procesos de lectura de los procesos secundarios, mientras que los procesos principales se agregan a ella, podría hacerlo por lo que veo, si apenas restrinjo los accesos de escritura al proceso principal.
gies0r
.. Pero no sé, ¿si shared memoryestá en algún tipo de block stateescritura? Eso significaría que los procesos secundarios no pueden leer el DataFrame, mientras que el proceso primario se agrega a él (lo cual será casi siempre).
gies0r
@ gies0r Perdón por mi inactividad. Si va a utilizar una Shared Memorysolución, debe sincronizar los procesos secundarios con el proceso del proveedor. Puede hacerlo a través de multiprocessing.Lock: docs.python.org/3/library/…
AmirHmZ
0

La paralelización en pandases probablemente mejor manejada por otro motor por completo.

Eche un vistazo al proyecto Koalas de Databricks o el DataFrame de Dask .

jorijnsmit
fuente
Bueno ... Esa es una gran cantidad de código para revisar y arreglar ... Parece que Dasks tiene una buena adaptación, pero aún así es un montón de trabajo. ¿Conoce algún ejemplo en el que se haya implementado / documentado un intervalo de carga / actualización de datos como el mencionado en la pregunta?
gies0r
Utilicé Dask para procesar conjuntos de datos de más de 200 GB en paralelo y una pequeña huella de memoria, pero no estaba en línea. Dask es básicamente muchos marcos de datos de pandas apilados uno encima del otro.
Ronald Luc
@RonaldLuc ¿qué tipo de operaciones realizó en su máquina local?
Datanovice
Cargue desde parquet, operaciones numéricas en filas, cálculo de geolocalizaciones, alguna operación en "pandas locales DataFrame" ( df.map_partitions) y luego groupbyindexe (importante para el rendimiento en Dask), guarde como CSV.
Ronald Luc
0

Una solución simple sería separar el proceso en dos etapas diferentes. Use Asyncio para recibir los datos de manera no bloqueadora y realizar sus transformaciones dentro de eso. La segunda etapa consumiría una cola Asyncio para construir el DataFrame. Esto supone que no necesita el DataFrame disponible para un proceso diferente mientras recibe datos de Redis Queue.

Aquí hay un ejemplo de cómo construir un modelo de productor / consumidor con Asyncio

arshit arora
fuente