Inserto a granel con SQLAlchemy ORM

130

¿Hay alguna manera de hacer que SQLAlchemy haga una inserción masiva en lugar de insertar cada objeto individual? es decir,

haciendo:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

más bien que:

INSERT INTO `foo` (`bar`) VALUES (1)
INSERT INTO `foo` (`bar`) VALUES (2)
INSERT INTO `foo` (`bar`) VALUES (3)

Acabo de convertir un código para usar sqlalchemy en lugar de sql sin procesar y, aunque ahora es mucho más agradable trabajar con él, parece ser más lento ahora (hasta un factor de 10), me pregunto si esta es la razón.

Puede ser que pueda mejorar la situación usando sesiones de manera más eficiente. Por el momento tengo autoCommit=Falsey hago un session.commit()después de haber agregado algunas cosas. Aunque esto parece hacer que los datos se vuelvan obsoletos si la base de datos se cambia en otro lugar, como si incluso hiciera una nueva consulta, ¿todavía obtengo resultados antiguos?

¡Gracias por tu ayuda!

Nick Holden
fuente
1
Esto podría ayudar: stackoverflow.com/questions/270879/…
Sean Vieira
1
Nick, entiendo que esta es una publicación muy antigua. ¿Sería posible actualizar el título a algo correcto como "inserción de registro múltiple con SQLAlchemy ORM". Las declaraciones de inserción de registros múltiples como la que ha proporcionado son bastante diferentes de las operaciones de carga masiva a nivel de base de datos. Las inserciones masivas están destinadas a cargas de datos de 1k +, generalmente de grandes conjuntos de datos y realizadas por gerentes de aplicaciones, no operaciones REST o código de nivel de aplicación ... Usemos nuestra nomenclatura correctamente.
W4t3randWind
Para aquellos que se topan con esta pregunta mientras buscan información sobre operaciones masivas en sqlalchemy Core (no ORM), vea mi respuesta a otra pregunta .
Nickolay

Respuestas:

173

SQLAlchemy introdujo eso en la versión 1.0.0:

Operaciones masivas - SQLAlchemy docs

¡Con estas operaciones, ahora puede hacer inserciones o actualizaciones masivas!

Por ejemplo, puedes hacer:

s = Session()
objects = [
    User(name="u1"),
    User(name="u2"),
    User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()

Aquí, se realizará un inserto a granel.

Pierre
fuente
30
También necesita s.commit () para guardar realmente los registros (me tomó un poco entender esto).
horcle_buzz
3
Intenté esto con sqlachemy 1.0.11 y todavía hace 3 declaraciones de inserción. Pero es mucho más rápido que las operaciones normales de orm.
zidarsk8
3
Si bien no es pertinente a la pregunta de los OP, vale la pena mencionar que esto rompe ciertas características del ORM. docs.sqlalchemy.org/en/rel_1_0/orm/…
danger
@dangel sí, gracias por publicar esto. Aunque el título de OP se refiere a "carga masiva", su pregunta sobre las declaraciones de inserción de múltiples registros no tiene nada que ver con la función de carga masiva de sqlalchemy.
W4t3randWind
En comparación con la inserción de los mismos datos de CSV con \copypsql (del mismo cliente al mismo servidor), veo una gran diferencia en el rendimiento en el lado del servidor que produce aproximadamente 10 veces más inserciones / s. Aparentemente, es una carga masiva usando \copy(o COPYen el servidor) usando un paquete para comunicarse de cliente a servidor MUCHO mejor que usar SQL a través de SQLAlchemy. Más información: Gran diferencia de rendimiento de inserción masiva PostgreSQL frente a ... .
gertvdijk
42

Los documentos de sqlalchemy tienen un resumen del rendimiento de varias técnicas que se pueden usar para inserciones en masa:

Básicamente, los ORM no están destinados a inserciones masivas de alto rendimiento; esta es la razón por la que SQLAlchemy ofrece el Core además del ORM como un componente de primera clase.

Para el caso de uso de inserciones masivas rápidas, el sistema de generación y ejecución de SQL que el ORM construye encima es parte del Core. Al usar este sistema directamente, podemos producir un INSERT que sea competitivo con el uso directo de la API de base de datos sin procesar.

Alternativamente, SQLAlchemy ORM ofrece el conjunto de métodos Bulk Operations, que proporciona enlaces en subsecciones del proceso de la unidad de trabajo para emitir construcciones INSERT y UPDATE de nivel central con un pequeño grado de automatización basada en ORM.

El siguiente ejemplo ilustra las pruebas basadas en el tiempo para varios métodos diferentes de inserción de filas, desde el más automatizado hasta el menos. Con cPython 2.7, los tiempos de ejecución observaron:

classics-MacBook-Pro:sqlalchemy classic$ python test.py
SQLAlchemy ORM: Total time for 100000 records 12.0471920967 secs
SQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secs
SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secs
SQLAlchemy Core: Total time for 100000 records 0.485800027847 secs
sqlite3: Total time for 100000 records 0.487842082977 sec

Guión:

import time
import sqlite3

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String,  create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

Base = declarative_base()
DBSession = scoped_session(sessionmaker())
engine = None


class Customer(Base):
    __tablename__ = "customer"
    id = Column(Integer, primary_key=True)
    name = Column(String(255))


def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'):
    global engine
    engine = create_engine(dbname, echo=False)
    DBSession.remove()
    DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)


def test_sqlalchemy_orm(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer()
        customer.name = 'NAME ' + str(i)
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_pk_given(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer(id=i+1, name="NAME " + str(i))
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM pk given: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_bulk_insert(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    n1 = n
    while n1 > 0:
        n1 = n1 - 10000
        DBSession.bulk_insert_mappings(
            Customer,
            [
                dict(name="NAME " + str(i))
                for i in xrange(min(10000, n1))
            ]
        )
    DBSession.commit()
    print(
        "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
        [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )
    print(
        "SQLAlchemy Core: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def init_sqlite3(dbname):
    conn = sqlite3.connect(dbname)
    c = conn.cursor()
    c.execute("DROP TABLE IF EXISTS customer")
    c.execute(
        "CREATE TABLE customer (id INTEGER NOT NULL, "
        "name VARCHAR(255), PRIMARY KEY(id))")
    conn.commit()
    return conn


def test_sqlite3(n=100000, dbname='sqlite3.db'):
    conn = init_sqlite3(dbname)
    c = conn.cursor()
    t0 = time.time()
    for i in xrange(n):
        row = ('NAME ' + str(i),)
        c.execute("INSERT INTO customer (name) VALUES (?)", row)
    conn.commit()
    print(
        "sqlite3: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " sec")

if __name__ == '__main__':
    test_sqlalchemy_orm(100000)
    test_sqlalchemy_orm_pk_given(100000)
    test_sqlalchemy_orm_bulk_insert(100000)
    test_sqlalchemy_core(100000)
    test_sqlite3(100000)
Grant Humphries
fuente
1
Gracias. Realmente servicial y minucioso.
Steve B.
Vi otro ejemplo usando bindparams. La sintaxis se ve sucinta, ¿está bien?
Jay
35

Hasta donde yo sé, no hay forma de que el ORM emita inserciones masivas. Creo que la razón subyacente es que SQLAlchemy necesita realizar un seguimiento de la identidad de cada objeto (es decir, nuevas claves primarias), y las inserciones masivas interfieren con eso. Por ejemplo, suponiendo que su footabla contiene una idcolumna y está asignada a una Fooclase:

x = Foo(bar=1)
print x.id
# None
session.add(x)
session.flush()
# BEGIN
# INSERT INTO foo (bar) VALUES(1)
# COMMIT
print x.id
# 1

Como SQLAlchemy recogió el valor x.idsin emitir otra consulta, podemos inferir que obtuvo el valor directamente de la INSERTdeclaración. Si no necesita acceso posterior a los objetos creados a través de las mismas instancias, puede omitir la capa ORM para su inserción:

Foo.__table__.insert().execute([{'bar': 1}, {'bar': 2}, {'bar': 3}])
# INSERT INTO foo (bar) VALUES ((1,), (2,), (3,))

SQLAlchemy no puede hacer coincidir estas nuevas filas con ningún objeto existente, por lo que tendrá que consultarlas nuevamente para cualquier operación posterior.

En lo que respecta a los datos obsoletos, es útil recordar que la sesión no tiene una forma integrada de saber cuándo se cambia la base de datos fuera de la sesión. Para acceder a datos modificados externamente a través de instancias existentes, las instancias deben marcarse como caducadas . Esto ocurre de forma predeterminada en session.commit(), pero se puede hacer manualmente llamando session.expire_all()o session.expire(instance). Un ejemplo (SQL omitido):

x = Foo(bar=1)
session.add(x)
session.commit()
print x.bar
# 1
foo.update().execute(bar=42)
print x.bar
# 1
session.expire(x)
print x.bar
# 42

session.commit() expira x , por lo que la primera declaración de impresión abre implícitamente una nueva transacción y vuelve a consultar xlos atributos. Si comenta la primera declaración impresa, notará que la segunda ahora recoge el valor correcto, porque la nueva consulta no se emite hasta después de la actualización.

Esto tiene sentido desde el punto de vista del aislamiento transaccional: solo debe recoger modificaciones externas entre transacciones. Si esto le está causando problemas, sugeriría aclarar o repensar los límites de transacción de su aplicación en lugar de buscarlos de inmediato session.expire_all().

dhaffey
fuente
Gracias por su respuesta, voy a intentarlo. WRT el problema que expira, lo que vi no era lo mismo. Estoy usando una sesión de alcance en turbogears. Al realizar una consulta getSession (). Query (Foo) .filter .... all () devolvió cosas diferentes según la solicitud, tampoco devolvió los registros actualizados que estaban en el db hasta que lo reinicié. Solucioné este problema haciendo un autocommit = True y agregando algo que .remove () d la sesión después de que se completó la solicitud (supongo que de todos modos debes hacerlo).
Nick Holden
¿Supongo que devolvió diferentes cosas dependiendo de la solicitud porque tenía una sesión de alcance por hilo en el grupo y las sesiones estaban en diferentes estados? Sin embargo, parecía un poco extraño que sa no obtuviera nuevos datos después de una nueva solicitud. Supongo que no entiendo lo que está haciendo Autocommit = False
Nick Holden
Con autocommit=False, creo que debería llamar session.commit()al completar la solicitud (no estoy familiarizado con TurboGears, así que ignore esto si se maneja para usted a nivel de marco). Además de asegurarse de que sus cambios se hayan realizado en la base de datos, esto expiraría todo en la sesión. La próxima transacción no comenzaría hasta el próximo uso de esa sesión, por lo que las solicitudes futuras en el mismo hilo no verían datos obsoletos.
dhaffey
10
Estilo alternativo:session.execute(Foo.__table__.insert(), values)
Joril
66
Tenga en cuenta que las versiones más recientes de sqlalchemy tienen capacidades de inserción masiva: docs.sqlalchemy.org/en/latest/orm/…
Wayne Werner
18

Usualmente lo hago usando add_all.

from app import session
from models import User

objects = [User(name="u1"), User(name="u2"), User(name="u3")]
session.add_all(objects)
session.commit()
reubano
fuente
2
¿Estás seguro de que esto funciona? ¿No solo equivale .adda llevarlos a la sesión uno a la vez?
Alec
Eso sería contrario a la intuición dado el nombre del método, los documentos no entran en detalles: Add the given collection of instances to this Session.¿Tiene alguna razón para creer que no hace una inserción masiva?
reubano
3
No creo que sea demasiado contradictorio: de hecho, agrega todas las cosas que le pides. Nada de agregar todas las cosas a la sesión parece implicar qué sentencias SQL subyacentes se emiten. Mirando la fuente: github.com/zzzeek/sqlalchemy/blob/… , de hecho, parece .addcada elemento individualmente.
Alec
Funciona bien, en comparación con bulk_save_objects(), con a flush(), podemos obtener la identificación del objeto, pero bulk_save_objects()no podemos (evento con flush()llamado).
coanor
14

Se agregó soporte directo a SQLAlchemy a partir de la versión 0.8

Según los documentos , connection.execute(table.insert().values(data))debería hacer el truco. (Tenga en cuenta que esto no es mismo connection.execute(table.insert(), data)que resulta en muchas inserciones de fila individuales a través de una llamada a executemany). En cualquier cosa que no sea una conexión local, la diferencia en el rendimiento puede ser enorme.

usuario3805082
fuente
10

SQLAlchemy introdujo eso en la versión 1.0.0:

Operaciones masivas - SQLAlchemy docs

¡Con estas operaciones, ahora puede hacer inserciones o actualizaciones masivas!

Por ejemplo (si desea la sobrecarga más baja para los INSERT de tabla simple), puede usar Session.bulk_insert_mappings():

loadme = [(1, 'a'),
          (2, 'b'),
          (3, 'c')]
dicts = [dict(bar=t[0], fly=t[1]) for t in loadme]

s = Session()
s.bulk_insert_mappings(Foo, dicts)
s.commit()

O, si lo desea, omita las loadmetuplas y escriba los diccionarios directamente dicts(pero me resulta más fácil dejar toda la palabrería de los datos y cargar una lista de diccionarios en un bucle).

juanitogan
fuente
7

La respuesta de Piere es correcta, pero un problema es que, bulk_save_objectspor defecto, no devuelve las claves principales de los objetos, si eso le preocupa. Establecer return_defaultsenTrue obtener este comportamiento.

La documentación está aquí .

foos = [Foo(bar='a',), Foo(bar='b'), Foo(bar='c')]
session.bulk_save_objects(foos, return_defaults=True)
for foo in foos:
    assert foo.id is not None
session.commit()
Matthew Moisen
fuente
2
Se debe tener precaución con la bandera. Insertará un objeto a la vez secuencialmente y la ganancia de rendimiento significativa puede no estar allí [1]. En mi caso, el rendimiento se degradó, lo que sospeché debido a la sobrecarga. [1]: docs.sqlalchemy.org/en/13/orm/…
dhfromkorea el
6

Todos los caminos conducen a Roma , pero algunos de ellos cruzan montañas, requieren transbordadores, pero si desea llegar rápidamente, tome la autopista.


En este caso, la autopista debe utilizar la función execute_batch () de psycopg2 . La documentación lo dice lo mejor:

La implementación actual de executemany()es (utilizando un eufemismo extremadamente caritativo) no está funcionando particularmente. Estas funciones se pueden usar para acelerar la ejecución repetida de una declaración contra un conjunto de parámetros. Al reducir el número de viajes de ida y vuelta del servidor, el rendimiento puede ser mucho mayor que el uso executemany().

En mi propia prueba execute_batch()es aproximadamente el doble de rápido queexecutemany() , y le da la opción de configurar el tamaño_página para ajustar aún más (si usted quiere exprimir el último 2-3% de rendimiento del conductor).

La misma característica se puede habilitar fácilmente si está utilizando SQLAlchemy configurando use_batch_mode=Truecomo parámetro cuando crea una instancia del motor concreate_engine()

chjortlund
fuente
Nota: ¡psycopg2 execute_valueses más rápido que psycopg2 execute_batchcuando se hacen inserciones masivas!
Fierr
5

Esta es una manera:

values = [1, 2, 3]
Foo.__table__.insert().execute([{'bar': x} for x in values])

Esto se insertará así:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

Referencia: Las preguntas frecuentes de SQLAlchemy incluyen puntos de referencia para varios métodos de confirmación.

Eefret
fuente
3

La mejor respuesta que encontré hasta ahora fue en la documentación de sqlalchemy:

http://docs.sqlalchemy.org/en/latest/faq/performance.html#im-inserting-400-000-rows-with-the-orm-and-it-s-really-slow

Hay un ejemplo completo de un punto de referencia de posibles soluciones.

Como se muestra en la documentación:

bulk_save_objects no es la mejor solución, pero su rendimiento es correcto.

La segunda mejor implementación en términos de legibilidad creo que fue con SQLAlchemy Core:

def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
            [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )

El contexto de esta función se proporciona en el artículo de documentación.

lelabo_m
fuente