¿iterador / generador de SqlAlchemy incorporado eficiente en memoria?

90

Tengo una tabla MySQL de registros de ~ 10M con la que interactúo usando SqlAlchemy. Descubrí que las consultas en grandes subconjuntos de esta tabla consumirán demasiada memoria a pesar de que pensé que estaba usando un generador incorporado que obtenía inteligentemente fragmentos del tamaño de un bocado del conjunto de datos:

for thing in session.query(Things):
    analyze(thing)

Para evitar esto, encuentro que tengo que construir mi propio iterador que muerde en trozos:

lastThingID = None
while True:
    things = query.filter(Thing.id < lastThingID).limit(querySize).all()
    if not rows or len(rows) == 0: 
        break
    for thing in things:
        lastThingID = row.id
        analyze(thing)

¿Es esto normal o hay algo que me falta con respecto a los generadores integrados de SA?

La respuesta a esta pregunta parece indicar que el consumo de memoria no es de esperar.

Paul
fuente
Tengo algo muy similar, excepto que produce "cosa". Funciona mejor que todas las demás soluciones
iElectric
2
¿No es Thing.id> lastThingID? ¿Y qué son las "filas"?
sinérgico

Respuestas:

118

La mayoría de las implementaciones de DBAPI almacenan en búfer las filas a medida que se obtienen; por lo general, antes de que el ORM de SQLAlchemy obtenga un resultado, todo el conjunto de resultados está en la memoria.

Pero entonces, la forma en que Queryfunciona es que carga completamente el conjunto de resultados dado de forma predeterminada antes de devolverle sus objetos. El fundamento aquí se refiere a las consultas que son más que simples declaraciones SELECT. Por ejemplo, en las uniones a otras tablas que pueden devolver la misma identidad de objeto varias veces en un conjunto de resultados (común con la carga ansiosa), el conjunto completo de filas debe estar en la memoria para que los resultados correctos se puedan devolver, de lo contrario, colecciones y tal podría estar solo parcialmente poblado.

Entonces Queryofrece una opción para cambiar este comportamiento a través de yield_per(). Esta llamada hará Queryque produzca filas en lotes, donde le da el tamaño del lote. Como dicen los documentos, esto solo es apropiado si no está haciendo ningún tipo de carga ansiosa de colecciones, por lo que básicamente es si realmente sabe lo que está haciendo. Además, si el DBAPI subyacente almacena previamente las filas en el búfer, seguirá existiendo esa sobrecarga de memoria, por lo que el enfoque solo se escala ligeramente mejor que no usarlo.

Casi nunca lo uso yield_per(); en cambio, utilizo una mejor versión del enfoque LIMIT que sugieres anteriormente usando funciones de ventana. LIMIT y OFFSET tienen un gran problema de que los valores de OFFSET muy grandes hacen que la consulta se vuelva cada vez más lenta, ya que un OFFSET de N hace que recorra N filas; es como hacer la misma consulta cincuenta veces en lugar de una, cada vez que lee un mayor y mayor número de filas. Con un enfoque de función de ventana, obtengo previamente un conjunto de valores de "ventana" que se refieren a partes de la tabla que quiero seleccionar. Luego emito declaraciones SELECT individuales que cada una extrae de una de esas ventanas a la vez.

El enfoque de la función de ventana está en la wiki y lo uso con gran éxito.

También tenga en cuenta: no todas las bases de datos admiten funciones de ventana; necesita Postgresql, Oracle o SQL Server. En mi humilde opinión, usar al menos Postgresql definitivamente vale la pena: si está usando una base de datos relacional, también podría usar la mejor.

zzzeek
fuente
Mencionas Query instanciates todo para comparar identidades. ¿Podría evitarse esto clasificando la clave principal y solo comparando resultados consecutivos?
Tobu
el problema es que si se obtiene una instancia con identidad X, la aplicación la controla y luego toma decisiones basadas en esta entidad, y tal vez incluso la muta. Más tarde, quizás (en realidad, por lo general) incluso en la siguiente fila, la misma identidad regresa en el resultado, quizás para agregar más contenido a sus colecciones. Por lo tanto, la aplicación recibió el objeto en un estado incompleto. ordenar no ayuda aquí porque el mayor problema es el funcionamiento de la carga ansiosa: tanto la carga "unida" como la "subconsulta" tienen problemas diferentes.
zzzeek
Entendí lo de "la siguiente fila actualiza las colecciones", en cuyo caso solo necesita mirar hacia adelante una fila de base de datos para saber cuando las colecciones están completas. La implementación de la carga ansiosa tendría que cooperar con la clasificación, de modo que las actualizaciones de la colección siempre se realicen en filas adyacentes.
Tobu
la opción yield_per () siempre está ahí para cuando esté seguro de que la consulta que está emitiendo es compatible con la entrega de conjuntos de resultados parciales. Pasé una sesión maratón de varios días tratando de habilitar este comportamiento en todos los casos, siempre hubo oscuros, es decir, hasta que tu programa usa uno de ellos, bordes que fallaron. En particular, no se puede asumir que depender de los pedidos. Como siempre, soy bienvenido a las contribuciones de código reales.
zzzeek
1
Como estoy usando postgres, parece que es posible usar la transacción de solo lectura de lectura repetible y ejecutar todas las consultas en ventana en esa transacción.
schatten
24

No soy un experto en bases de datos, pero cuando uso SQLAlchemy como una simple capa de abstracción de Python (es decir, no uso el objeto de consulta ORM), se me ocurrió una solución satisfactoria para consultar una tabla de 300M de filas sin aumentar el uso de memoria ...

Aquí hay un ejemplo ficticio:

from sqlalchemy import create_engine, select

conn = create_engine("DB URL...").connect()
q = select([huge_table])

proxy = conn.execution_options(stream_results=True).execute(q)

Luego, uso el fetchmany()método SQLAlchemy para iterar sobre los resultados en un whilebucle infinito :

while 'batch not empty':  # equivalent of 'while True', but clearer
    batch = proxy.fetchmany(100000)  # 100,000 rows at a time

    if not batch:
        break

    for row in batch:
        # Do your stuff here...

proxy.close()

Este método me permitió hacer todo tipo de agregación de datos sin ninguna sobrecarga de memoria peligrosa.

NOTE el stream_resultstrabaja con PostgreSQL y el pyscopg2adaptador, pero supongo que no va a funcionar con cualquier DBAPI, ni con cualquier controlador de base de datos ...

Hay un caso de uso interesante en esta publicación de blog que inspiró mi método anterior.

Edouardtheron
fuente
1
Si uno está trabajando en postgres o mysql (con pymysql), esta debería ser la respuesta aceptada en mi humilde opinión.
Yuki Inoue
1
Me salvó la vida, estaba viendo que mis consultas se ejecutaban cada vez más lentamente. He instrumentado lo anterior en pyodbc (desde el servidor sql hasta postgres) y está funcionando como un sueño.
Ed Baker
Este fue para mí el mejor enfoque. Como estoy usando ORM, necesitaba compilar el SQL en mi dialecto (Postgres) y luego ejecutarlo directamente desde la conexión (no desde la sesión) como se muestra arriba. La compilación "cómo" que encontré en esta otra pregunta stackoverflow.com/questions/4617291 . La mejora en la velocidad fue grande. El cambio de JOINS a SUBQUERIES también supuso un gran aumento en el rendimiento. También recomiendo usar sqlalchemy_mixins, el uso de smart_query ayudó mucho a construir la consulta más eficiente. github.com/absent1706/sqlalchemy-mixins
Gustavo Gonçalves
14

He estado buscando un recorrido / paginación eficiente con SQLAlchemy y me gustaría actualizar esta respuesta.

Creo que puede usar la llamada de segmento para limitar adecuadamente el alcance de una consulta y podría reutilizarla de manera eficiente.

Ejemplo:

window_size = 10  # or whatever limit you like
window_idx = 0
while True:
    start,stop = window_size*window_idx, window_size*(window_idx+1)
    things = query.slice(start, stop).all()
    if things is None:
        break
    for thing in things:
        analyze(thing)
    if len(things) < window_size:
        break
    window_idx += 1
Joel
fuente
Esto parece muy simple y rápido. No estoy seguro de que .all()sea ​​necesario. Noto que la velocidad mejoró mucho después de la primera llamada.
hamx0r
@ hamx0r Me doy cuenta de que este es un comentario antiguo, así que lo dejo para la posteridad. Sin .all()la variable de cosas hay una consulta que no admite len ()
David
9

En el espíritu de la respuesta de Joel, utilizo lo siguiente:

WINDOW_SIZE = 1000
def qgen(query):
    start = 0
    while True:
        stop = start + WINDOW_SIZE
        things = query.slice(start, stop).all()
        if len(things) == 0:
            break
        for thing in things:
            yield thing
        start += WINDOW_SIZE
Pietro Battiston
fuente
things = query.slice (start, stop) .all () devolverá [] al final y el bucle while nunca se romperá
Martin Reguly
4

Usar LIMIT / OFFSET es malo, porque necesita encontrar todas las columnas {OFFSET} antes, por lo que cuanto más grande es OFFSET, la solicitud más larga obtiene. El uso de la consulta en ventana para mí también da malos resultados en una tabla grande con una gran cantidad de datos (espera los primeros resultados durante demasiado tiempo, eso no es bueno en mi caso para una respuesta web fragmentada).

El mejor enfoque dado aquí https://stackoverflow.com/a/27169302/450103 . En mi caso, resolví el problema simplemente usando el índice en el campo de fecha y hora y obteniendo la siguiente consulta con datetime> = previous_datetime. Estúpido, porque usé ese índice en diferentes casos antes, pero pensé que para obtener todos los datos, la consulta en ventana sería mejor. En mi caso me equivoqué.

Víctor Gavro
fuente
3

AFAIK, la primera variante aún obtiene todas las tuplas de la tabla (con una consulta SQL) pero construye la presentación ORM para cada entidad cuando se itera. Por lo tanto, es más eficiente que crear una lista de todas las entidades antes de iterar, pero aún debe recuperar todos los datos (sin procesar) en la memoria.

Por lo tanto, usar LIMIT en tablas enormes me parece una buena idea.

Pankrat
fuente