¿Cómo UPSERT (FUSIONAR, INSERTAR ... EN ACTUALIZACIÓN DUPLICADA) en PostgreSQL?

268

Una pregunta muy frecuente aquí es cómo hacer un upsert, que es lo que MySQL llama INSERT ... ON DUPLICATE UPDATEy el estándar admite como parte de la MERGEoperación.

Dado que PostgreSQL no lo admite directamente (antes de la página 9.5), ¿cómo se hace esto? Considera lo siguiente:

CREATE TABLE testtable (
    id integer PRIMARY KEY,
    somedata text NOT NULL
);

INSERT INTO testtable (id, somedata) VALUES
(1, 'fred'),
(2, 'bob');

Ahora imagine que usted quiere "upsert" las tuplas (2, 'Joe'), (3, 'Alan'), por lo que los nuevos contenidos de la tabla serían:

(1, 'fred'),
(2, 'Joe'),    -- Changed value of existing tuple
(3, 'Alan')    -- Added new tuple

De eso es de lo que habla la gente cuando habla de un upsert. De manera crucial, cualquier enfoque debe ser seguro en presencia de múltiples transacciones que trabajen en la misma mesa , ya sea mediante el bloqueo explícito o defendiéndose de las condiciones de carrera resultantes.

Este tema se discute ampliamente en Insertar, ¿en actualizaciones duplicadas en PostgreSQL? , pero se trata de alternativas a la sintaxis de MySQL, y ha crecido un poco de detalles no relacionados con el tiempo. Estoy trabajando en respuestas definitivas.

Estas técnicas también son útiles para "insertar si no existe, de lo contrario no hacer nada", es decir, "insertar ... ignorar clave duplicada".

Craig Ringer
fuente
1
posible duplicado de Insert, en actualización duplicada en PostgreSQL?
Michael Hampton
8
@MichaelHampton el objetivo aquí era crear una versión definitiva que no esté confundida por múltiples respuestas obsoletas, y bloqueada, para que nadie pueda hacer nada al respecto. No estoy de acuerdo con el voto cerrado.
Craig Ringer
Entonces, esto pronto quedaría obsoleto, y bloqueado, para que nadie pudiera hacer nada al respecto.
Michael Hampton
2
@MichaelHampton Si está preocupado, tal vez podría marcar el que se vinculó y pedir que se desbloquee para que pueda limpiarse, entonces podemos fusionarlo. Estoy harto de tener el único cierre obvio. as-dup para upsert es un desastre tan confuso e incorrecto.
Craig Ringer
1
¡Ese Q&A no está bloqueado!
Michael Hampton

Respuestas:

396

9.5 y más reciente:

PostgreSQL 9.5 y soporte más reciente INSERT ... ON CONFLICT UPDATE(y ON CONFLICT DO NOTHING), es decir, upsert.

Comparación conON DUPLICATE KEY UPDATE .

Explicación rápida .

Para el uso, consulte el manual , específicamente la cláusula conflict_action en el diagrama de sintaxis y el texto explicativo .

A diferencia de las soluciones para 9.4 y versiones anteriores que se proporcionan a continuación, esta función funciona con múltiples filas en conflicto y no requiere bloqueo exclusivo o un ciclo de reintento.

El commit que agrega la característica está aquí y la discusión sobre su desarrollo está aquí .


Si está en 9.5 y no necesita ser compatible con versiones anteriores, puede dejar de leer ahora .


9.4 y mayores:

PostgreSQL no tiene ninguna función incorporada UPSERT(o MERGE), y hacerlo de manera eficiente frente al uso concurrente es muy difícil.

Este artículo analiza el problema con detalles útiles .

En general, debe elegir entre dos opciones:

  • Operaciones individuales de inserción / actualización en un ciclo de reintento; o
  • Bloqueo de la mesa y combinación de lotes

Bucle de reintento de fila individual

El uso de upserts de fila individuales en un bucle de reintento es la opción razonable si desea que muchas conexiones intenten simultáneamente realizar inserciones.

La documentación de PostgreSQL contiene un procedimiento útil que le permitirá hacer esto en un bucle dentro de la base de datos . Protege contra actualizaciones perdidas e inserta carreras, a diferencia de la mayoría de las soluciones ingenuas. Sin READ COMMITTEDembargo, solo funcionará en modo y solo es seguro si es lo único que haces en la transacción. La función no funcionará correctamente si los disparadores o las teclas únicas secundarias causan violaciones únicas.

Esta estrategia es muy ineficiente. Siempre que sea práctico, debe poner en cola el trabajo y hacer un upsert masivo como se describe a continuación.

Muchos intentos de solución a este problema no consideran las reversiones, por lo que resultan en actualizaciones incompletas. Dos transacciones corren entre sí; uno de ellos con éxito INSERTs; el otro obtiene un error de clave duplicada y lo hace en su UPDATElugar. Los UPDATEbloques que esperan INSERTque retrocedan o se comprometan. Cuando se revierte, la UPDATEnueva comprobación de la condición coincide con cero filas, por lo que a pesar de las UPDATEconfirmaciones, en realidad no ha realizado la recuperación que esperaba. Debe verificar el recuento de filas de resultados y volver a intentarlo cuando sea necesario.

Algunas soluciones intentadas tampoco logran considerar las carreras SELECT. Si intentas lo obvio y simple:

-- THIS IS WRONG. DO NOT COPY IT. It's an EXAMPLE.

BEGIN;

UPDATE testtable
SET somedata = 'blah'
WHERE id = 2;

-- Remember, this is WRONG. Do NOT COPY IT.

INSERT INTO testtable (id, somedata)
SELECT 2, 'blah'
WHERE NOT EXISTS (SELECT 1 FROM testtable WHERE testtable.id = 2);

COMMIT;

luego, cuando dos se ejecutan a la vez, hay varios modos de falla. Uno es el problema ya discutido con una nueva verificación de actualización. Otro es donde ambos UPDATEal mismo tiempo, coinciden con cero filas y continúan. Luego ambos hacen la EXISTSprueba, que ocurre antes del INSERT. Ambos obtienen cero filas, por lo que ambos hacen el INSERT. Uno falla con un error de clave duplicada.

Es por eso que necesita un bucle de reintento. Puede pensar que puede evitar errores clave duplicados o actualizaciones perdidas con SQL inteligente, pero no puede. Debe verificar los recuentos de filas o manejar errores clave duplicados (según el enfoque elegido) y volver a intentarlo.

Por favor, no presente su propia solución para esto. Al igual que con la cola de mensajes, probablemente esté mal.

Upsert a granel con cerradura

A veces, desea realizar una inserción ascendente masiva, donde tiene un nuevo conjunto de datos que desea fusionar en un conjunto de datos existente más antiguo. Esto es mucho más eficiente que las filas superiores individuales y debe preferirse siempre que sea práctico.

En este caso, normalmente sigue el siguiente proceso:

  • CREATEuna TEMPORARYmesa

  • COPY o inserte en masa los nuevos datos en la tabla temporal

  • LOCKla mesa de destino IN EXCLUSIVE MODE. Esto permite otras transacciones SELECT, pero no realiza ningún cambio en la tabla.

  • Realice uno UPDATE ... FROMde los registros existentes utilizando los valores de la tabla temporal;

  • Haga una INSERTde las filas que aún no existen en la tabla de destino;

  • COMMIT, soltando la cerradura.

Por ejemplo, para el ejemplo dado en la pregunta, usando valores múltiples INSERTpara llenar la tabla temporal:

BEGIN;

CREATE TEMPORARY TABLE newvals(id integer, somedata text);

INSERT INTO newvals(id, somedata) VALUES (2, 'Joe'), (3, 'Alan');

LOCK TABLE testtable IN EXCLUSIVE MODE;

UPDATE testtable
SET somedata = newvals.somedata
FROM newvals
WHERE newvals.id = testtable.id;

INSERT INTO testtable
SELECT newvals.id, newvals.somedata
FROM newvals
LEFT OUTER JOIN testtable ON (testtable.id = newvals.id)
WHERE testtable.id IS NULL;

COMMIT;

Lectura relacionada

¿Qué hay de MERGE?

El estándar SQL en MERGErealidad tiene una semántica de concurrencia mal definida y no es adecuado para la inserción sin bloquear primero una tabla.

Es una declaración OLAP realmente útil para la fusión de datos, pero en realidad no es una solución útil para la inserción segura de concurrencia. Hay muchos consejos para las personas que usan otros DBMS para usar MERGEen los upserts, pero en realidad está mal.

Otros DB:

Craig Ringer
fuente
En el upsert masivo, ¿hay algún valor en eliminar de nuevos valores en lugar de filtrar INSERT? Por ejemplo, con upd AS (ACTUALIZACIÓN ... REGRESANDO newvals.id) BORRAR DE newvals USANDO upd DONDE newvals.id = upd.id, seguido de una INSERTAR en la tabla de prueba SELECT * FROM newvals? Mi idea con esto: en lugar de filtrar dos veces en INSERT (para JOIN / WHERE y para la restricción única), reutilice los resultados de la verificación de existencia de UPDATE, que ya están en RAM, y pueden ser mucho más pequeños. Esto puede ser una ganancia si pocas filas coinciden y / o los nuevos valores son mucho más pequeños que los de la tabla de prueba.
Gunnlaugur Briem
1
Todavía hay problemas sin resolver y para los otros proveedores no está claro qué funciona y qué no. 1. La solución de bucle de Postgres como se indicó no funciona en el caso de múltiples claves únicas. 2. La clave duplicada en mysql tampoco funciona para múltiples claves únicas. 3. ¿Las otras soluciones para MySQL, SQL Server y Oracle publicadas anteriormente funcionan? ¿Son posibles las excepciones en esos casos y tenemos que hacer un bucle?
dan b
@danb Esto solo se trata realmente de PostgreSQL. No hay una solución de proveedores cruzados. La solución para PostgreSQL no funciona para varias filas, desafortunadamente debe hacer una transacción por fila. Las "soluciones" que se utilizan MERGEpara SQL Server y Oracle son incorrectas y propensas a las condiciones de carrera, como se indicó anteriormente. Tendrá que examinar cada DBMS específicamente para averiguar cómo manejarlos, realmente solo puedo ofrecer consejos sobre PostgreSQL. La única forma de hacer una inserción segura de varias filas en PostgreSQL será si se agrega soporte para la inserción nativa al servidor central.
Craig Ringer el
Incluso para PostGresQL, la solución no funciona en el caso de que una tabla tenga varias claves únicas (actualizando solo una fila). En ese caso, debe especificar qué clave se está actualizando. Puede haber una solución de proveedores cruzados usando jdbc por ejemplo.
dan b
2
Postgres ahora es compatible con UPSERT - git.postgresql.org/gitweb/…
Chris
32

Estoy tratando de contribuir con otra solución para el problema de inserción única con las versiones anteriores a 9.5 de PostgreSQL. La idea es simplemente intentar realizar primero la inserción y, en caso de que el registro ya esté presente, actualizarlo:

do $$
begin 
  insert into testtable(id, somedata) values(2,'Joe');
exception when unique_violation then
  update testtable set somedata = 'Joe' where id = 2;
end $$;

Tenga en cuenta que esta solución solo se puede aplicar si no hay eliminaciones de filas de la tabla .

No sé sobre la eficiencia de esta solución, pero me parece bastante razonable.

Renzo
fuente
3
Gracias, eso es exactamente lo que estaba buscando. No puedo entender por qué fue tan difícil de encontrar.
isapir
44
Sí. Esta simplificación funciona si y solo si no hay eliminaciones.
Craig Ringer el
@CraigRinger ¿Puede explicar qué sucederá exactamente si se eliminan?
turbanoff
@turbanoff La inserción puede fallar porque el registro ya está allí, luego se elimina simultáneamente y la actualización afecta a cero filas porque la fila se eliminó.
Craig Ringer
@CraigRinger So. La eliminación se realiza simultáneamente . ¿Cuáles son los posibles outways si esto es funciona bien? Si la eliminación funciona simultáneamente, entonces puede ejecutarse justo después de nuestro bloqueo. Lo que estoy tratando de decir, si tenemos una eliminación simultánea, entonces este código funciona de la misma manera que el correctoinsert on update
turbante
30

Aquí hay algunos ejemplos para insert ... on conflict ...( pág. 9.5+ ):

  • Insertar, en conflicto, no hacer nada .
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict do nothing;`  
  • Insertar, en conflicto - actualizar , especificar destino de conflicto a través de la columna .
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict(id)
    do update set name = 'new_name', size = 3;  
  • Insertar, en conflicto: actualizar , especificar destino de conflicto mediante nombre de restricción .
    insert into dummy(id, name, size) values(1, 'new_name', 3)
    on conflict on constraint dummy_pkey
    do update set name = 'new_name', size = 4;
Eric Wang
fuente
gran respuesta: pregunta: ¿por qué o en qué situación se debe usar la especificación de destino a través de una columna o nombre de restricción? ¿Hay una ventaja / desventaja para varios casos de uso?
Nathan Benton
1
@NathanBenton Creo que hay al menos 2 diferencias: (1) el nombre de la columna es especificado por el programador, mientras que el nombre de la restricción puede ser especificado por el programador o generado por la base de datos de acuerdo con los nombres de tabla / columna. (2) cada columna puede tener múltiples restricciones. Dicho esto, depende de su caso elegir cuál usar.
Eric Wang
8

SQLAlchemy upsert para Postgres> = 9.5

Dado que la publicación grande anterior cubre muchos enfoques SQL diferentes para las versiones de Postgres (no solo no 9.5 como en la pregunta), me gustaría agregar cómo hacerlo en SQLAlchemy si está utilizando Postgres 9.5. En lugar de implementar su propio upsert, también puede usar las funciones de SQLAlchemy (que se agregaron en SQLAlchemy 1.1). Personalmente, recomendaría usar estos, si es posible. No solo por conveniencia, sino también porque le permite a PostgreSQL manejar cualquier condición de carrera que pueda ocurrir.

Publicación cruzada de otra respuesta que di ayer ( https://stackoverflow.com/a/44395983/2156909 )

SQLAlchemy es compatible ON CONFLICTahora con dos métodos on_conflict_do_update()y on_conflict_do_nothing():

Copiando de la documentación:

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='[email protected]', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
    )
conn.execute(stmt)

http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight=conflict#insert-on-conflict-upsert

PR
fuente
44
Python y SQLAlchemy no se mencionan en la pregunta.
Alexander Emelianov el
A menudo uso Python en las soluciones que escribo. Pero no he investigado SQLAlchemy (o lo sabía). Esta parece una opción elegante. Gracias. Si sale, lo presentaré a mi organización.
Robert
3
WITH UPD AS (UPDATE TEST_TABLE SET SOME_DATA = 'Joe' WHERE ID = 2 
RETURNING ID),
INS AS (SELECT '2', 'Joe' WHERE NOT EXISTS (SELECT * FROM UPD))
INSERT INTO TEST_TABLE(ID, SOME_DATA) SELECT * FROM INS

Probado en Postgresql 9.3

aristar
fuente
@CraigRinger: ¿podrías explicar esto? no es el cte atómico?
París
2
@parisni No. Cada término CTE obtiene su propia instantánea si realiza escrituras. Además, no se realiza ningún tipo de bloqueo de predicados en filas que no se encontraron, por lo que aún pueden crearse simultáneamente en otra sesión. Si usara SERIALIZABLEaislamiento, obtendría un aborto con una falla de serialización, de lo contrario, probablemente obtendría una violación única. No reinventes upsert, la reinvención será incorrecta. Uso INSERT ... ON CONFLICT .... Si su PostgreSQL es demasiado viejo, actualícelo.
Craig Ringer
@CraigRinger INSERT ... ON CLONFLICT ...no está destinado a la carga masiva. Desde su publicación, LOCK TABLE testtable IN EXCLUSIVE MODE;dentro de un CTE es una solución para obtener cosas atómicas. No ?
Paris
@parisni ¿No está destinado a la carga masiva? ¿Dice quién? postgresql.org/docs/current/sql-insert.html#SQL-ON-CONFLICT . Claro, es mucho más lento que la carga masiva sin un comportamiento similar a un upsert, pero eso es obvio y será el caso sin importar lo que hagas. Es mucho más rápido que usar subtransacciones, eso es seguro. El enfoque más rápido es bloquear la tabla de destino y luego hacer una insert ... where not exists ...o similar, por supuesto.
Craig Ringer
1

Dado que esta pregunta se cerró, estoy publicando aquí sobre cómo lo haces usando SQLAlchemy. A través de la recursividad, vuelve a intentar una inserción o actualización masiva para combatir las condiciones de carrera y los errores de validación.

Primero las importaciones

import itertools as it

from functools import partial
from operator import itemgetter

from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts

Ahora un par de funciones auxiliares

def chunk(content, chunksize=None):
    """Groups data into chunks each with (at most) `chunksize` items.
    https://stackoverflow.com/a/22919323/408556
    """
    if chunksize:
        i = iter(content)
        generator = (list(it.islice(i, chunksize)) for _ in it.count())
    else:
        generator = iter([content])

    return it.takewhile(bool, generator)


def gen_resources(records):
    """Yields a dictionary if the record's id already exists, a row object 
    otherwise.
    """
    ids = {item[0] for item in session.query(Posts.id)}

    for record in records:
        is_row = hasattr(record, 'to_dict')

        if is_row and record.id in ids:
            # It's a row but the id already exists, so we need to convert it 
            # to a dict that updates the existing record. Since it is duplicate,
            # also yield True
            yield record.to_dict(), True
        elif is_row:
            # It's a row and the id doesn't exist, so no conversion needed. 
            # Since it's not a duplicate, also yield False
            yield record, False
        elif record['id'] in ids:
            # It's a dict and the id already exists, so no conversion needed. 
            # Since it is duplicate, also yield True
            yield record, True
        else:
            # It's a dict and the id doesn't exist, so we need to convert it. 
            # Since it's not a duplicate, also yield False
            yield Posts(**record), False

Y finalmente la función upsert

def upsert(data, chunksize=None):
    for records in chunk(data, chunksize):
        resources = gen_resources(records)
        sorted_resources = sorted(resources, key=itemgetter(1))

        for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
            items = [g[0] for g in group]

            if dupe:
                _upsert = partial(session.bulk_update_mappings, Posts)
            else:
                _upsert = session.add_all

            try:
                _upsert(items)
                session.commit()
            except IntegrityError:
                # A record was added or deleted after we checked, so retry
                # 
                # modify accordingly by adding additional exceptions, e.g.,
                # except (IntegrityError, ValidationError, ValueError)
                db.session.rollback()
                upsert(items)
            except Exception as e:
                # Some other error occurred so reduce chunksize to isolate the 
                # offending row(s)
                db.session.rollback()
                num_items = len(items)

                if num_items > 1:
                    upsert(items, num_items // 2)
                else:
                    print('Error adding record {}'.format(items[0]))

Así es como lo usas

>>> data = [
...     {'id': 1, 'text': 'updated post1'}, 
...     {'id': 5, 'text': 'updated post5'}, 
...     {'id': 1000, 'text': 'new post1000'}]
... 
>>> upsert(data)

La ventaja que tiene sobre esto bulk_save_objectses que puede manejar relaciones, verificación de errores, etc. en la inserción (a diferencia de las operaciones masivas ).

reubano
fuente
También me parece mal. ¿Qué sucede si una sesión concurrente inserta una fila después de recopilar su lista de ID? O elimina uno?
Craig Ringer el
buen punto @CraigRinger Hago algo similar a esto pero solo tengo 1 sesión realizando el trabajo. ¿Cuál es la mejor manera de manejar múltiples sesiones entonces? ¿Una transacción tal vez?
reubano
Las transacciones no son la solución mágica para todos los problemas de concurrencia. Puede usar SERIALIZABLE transacciones y manejar fallas de serialización, pero es lento. Necesita manejo de errores y un bucle de reintento. Vea mi respuesta y la sección de "lectura relacionada" en ella.
Craig Ringer el
@CraigRinger te tengo. De hecho, implementé un bucle de reintento en mi propio caso debido a otras fallas de validación. Actualizaré esta respuesta en consecuencia.
reubano