Python sqlite3 y concurrencia

87

Tengo un programa de Python que usa el módulo "subprocesamiento". Una vez por segundo, mi programa inicia un nuevo hilo que recupera algunos datos de la web y los almacena en mi disco duro. Me gustaría usar sqlite3 para almacenar estos resultados, pero no puedo hacer que funcione. El problema parece estar relacionado con la siguiente línea:

conn = sqlite3.connect("mydatabase.db")
  • Si pongo esta línea de código dentro de cada hilo, obtengo un OperationalError que me dice que el archivo de la base de datos está bloqueado. Supongo que esto significa que otro hilo tiene mydatabase.db abierto a través de una conexión sqlite3 y lo ha bloqueado.
  • Si pongo esta línea de código en el programa principal y paso el objeto de conexión (conn) a cada hilo, obtengo un ProgrammingError, que dice que los objetos SQLite creados en un hilo solo pueden usarse en ese mismo hilo.

Anteriormente, almacenaba todos mis resultados en archivos CSV y no tenía ninguno de estos problemas de bloqueo de archivos. Con suerte, esto será posible con sqlite. ¿Algunas ideas?

RexE
fuente
5
Me gustaría señalar que las versiones más recientes de Python incluyen versiones más nuevas de sqlite3 que deberían solucionar este problema.
Ryan Fugger
@RyanFugger, ¿sabes cuál es la versión más antigua que admite esto? Estoy usando 2.7
notbad.jpeg
@RyanFugger AFAIK no hay una versión prediseñada que contenga una versión más nueva de SQLite3 que lo tenga arreglado. Sin embargo, puedes construir uno tú mismo.
shezi

Respuestas:

44

Puede utilizar el patrón consumidor-productor. Por ejemplo, puede crear una cola que se comparte entre subprocesos. El primer subproceso que obtiene datos de la web coloca estos datos en la cola compartida. Otro subproceso que posee la conexión de la base de datos quita los datos de la cola y los pasa a la base de datos.

Evgeny Lazin
fuente
8
FWIW: Las versiones posteriores de sqlite afirman que puede compartir conexiones y objetos a través de subprocesos (excepto cursores), pero he descubierto lo contrario en la práctica.
Richard Levasseur
Aquí hay un ejemplo de lo que Evgeny Lazin mencionó anteriormente.
dugres
4
Ocultar su base de datos detrás de una cola compartida es una solución realmente mala para esta pregunta porque SQL en general y SQLite específicamente ya tienen mecanismos de bloqueo incorporados, que probablemente sean mucho más refinados que cualquier cosa que pueda construir ad-hoc usted mismo.
shezi
1
Debe leer la pregunta, en ese momento no había mecanismos de bloqueo incorporados. Muchas bases de datos integradas contemporáneas carecen de este mecanismo por razones de rendimiento (por ejemplo: LevelDB).
Evgeny Lazin
180

Contrariamente a la creencia popular, las nuevas versiones de sqlite3 hacen apoyar el acceso de múltiples hilos.

Esto se puede habilitar mediante un argumento de palabra clave opcional check_same_thread:

sqlite.connect(":memory:", check_same_thread=False)
Jeremiah Rose
fuente
4
He encontrado excepciones impredecibles e incluso Python se bloquea con esta opción (Python 2.7 en Windows 32).
reclosedev
4
Según los documentos , en el modo de subprocesos múltiples, no se puede utilizar una única conexión de base de datos en varios subprocesos. También hay un modo serializado
Casebash
1
No importa, lo acabo de encontrar: http://sqlite.org/compile.html#threadsafe
Medeiros
1
@FrEaKmAn, lo siento, fue hace mucho tiempo, tampoco: memory: database. Después de eso, no compartí la conexión sqlite en varios hilos.
reclosedev
2
@FrEaKmAn, me encontré con esto, con el proceso de Python volcando el núcleo en el acceso de múltiples subprocesos. El comportamiento fue impredecible y no se registró ninguna excepción. Si mal no recuerdo, esto fue cierto tanto para lecturas como para escrituras. Esta es la única cosa que he visto realmente bloquear Python hasta ahora: D. No probé esto con sqlite compilado en modo seguro para subprocesos, pero en ese momento, no tenía la libertad de volver a compilar el sqlite predeterminado del sistema. Terminé haciendo algo similar a lo que Eric sugirió y deshabilité la compatibilidad de subprocesos
verboze
17

Lo siguiente que se encuentra en mail.python.org.pipermail.1239789

He encontrado la solución. No sé por qué la documentación de Python no tiene una sola palabra sobre esta opción. Entonces, tenemos que agregar un nuevo argumento de palabra clave a la función de conexión y podremos crear cursores a partir de él en diferentes hilos. Entonces usa:

sqlite.connect(":memory:", check_same_thread = False)

funciona perfectamente para mí. Por supuesto, de ahora en adelante, debo ocuparme del acceso seguro de múltiples subprocesos a la base de datos. De todos modos gracias a todos por intentar ayudar.

Robert Krolik
fuente
(Con el GIL, realmente no hay mucho en el camino del verdadero acceso multiproceso a la base de datos de todos modos que he visto)
Erik Aronesty
ADVERTENCIA : La documentación de Python tienen este que decir acerca de la check_same_threadopción: "Al utilizar múltiples hilos con la misma conexión de las operaciones de escritura debe ser serializado por el usuario para evitar la corrupción de datos." Entonces, sí, puede usar SQLite con múltiples subprocesos siempre que su código garantice que solo un subproceso puede escribir en la base de datos en un momento dado. Si no es así, puede dañar su base de datos.
Ajedi32
14

Cambie a multiprocesamiento . Es mucho mejor, se escala bien, puede ir más allá del uso de múltiples núcleos mediante el uso de múltiples CPU, y la interfaz es la misma que la del módulo de subprocesamiento de Python.

O, como sugirió Ali, simplemente use el mecanismo de agrupación de subprocesos de SQLAlchemy . Se encargará de todo automáticamente y tiene muchas funciones adicionales, solo para citar algunas de ellas:

  1. SQLAlchemy incluye dialectos para SQLite, Postgres, MySQL, Oracle, MS-SQL, Firebird, MaxDB, MS Access, Sybase e Informix; IBM también ha lanzado un controlador DB2. Por lo tanto, no tiene que reescribir su aplicación si decide alejarse de SQLite.
  2. El sistema Unit Of Work, una parte central del Object Relational Mapper (ORM) de SQLAlchemy, organiza las operaciones pendientes de creación / inserción / actualización / eliminación en colas y las descarga todas en un solo lote. Para lograr esto, realiza un "ordenamiento de dependencia" topológico de todos los elementos modificados en la cola para respetar las restricciones de clave externa, y agrupa las declaraciones redundantes donde a veces se pueden agrupar aún más. Esto produce la máxima eficiencia y seguridad en las transacciones y minimiza las posibilidades de interbloqueos.
nosklo
fuente
12

No deberías usar hilos en absoluto para esto. Esta es una tarea trivial para los retorcidos y, de todos modos, probablemente lo llevará significativamente más lejos.

Use solo un hilo y haga que la finalización de la solicitud active un evento para realizar la escritura.

twisted se encargará de la programación, devoluciones de llamada, etc ... por usted. Le entregará el resultado completo como una cadena, o puede ejecutarlo a través de un procesador de transmisión (tengo una API de Twitter y una API de friendfeed que activan eventos a las personas que llaman, ya que los resultados aún se descargan).

Dependiendo de lo que esté haciendo con sus datos, puede simplemente volcar el resultado completo en sqlite cuando esté completo, cocinarlo y volcarlo, o cocinarlo mientras se lee y volcarlo al final.

Tengo una aplicación muy simple que hace algo parecido a lo que quieres en github. Lo llamo pfetch (búsqueda en paralelo). Toma varias páginas en un horario, transmite los resultados a un archivo y, opcionalmente, ejecuta un script una vez que se completa con éxito cada una. También hace algunas cosas sofisticadas como GET condicionales, pero aún podría ser una buena base para lo que sea que esté haciendo.

Dustin
fuente
7

O si eres vago, como yo, puedes usar SQLAlchemy . Manejará el subproceso por usted ( usando subprocesos locales y algunas agrupaciones de conexiones ) y la forma en que lo hace es incluso configurable .

Para una ventaja adicional, si / cuando se da cuenta / decide que usar Sqlite para cualquier aplicación concurrente va a ser un desastre, no tendrá que cambiar su código para usar MySQL, Postgres o cualquier otra cosa. Puedes simplemente cambiar.

Ali Afshar
fuente
1
¿Por qué no especifica la versión de Python en ninguna parte del sitio web oficial?
Nombre para mostrar el
3

Debe usar session.close()después de cada transacción a la base de datos para usar el mismo cursor en el mismo hilo, sin usar el mismo cursor en varios hilos que causan este error.

Hazem Khaled
fuente
1

Utilice threading.Lock ()

Alexandr
fuente
1
Proporcione lo que hace el siguiente código y dónde debe usarse.
Ali Akhtari
0

Me gusta la respuesta de Evgeny: las colas son generalmente la mejor manera de implementar la comunicación entre subprocesos. Para completar, aquí hay algunas otras opciones:

  • Cierre la conexión DB cuando los subprocesos generados hayan terminado de usarla. Esto solucionaría su problema OperationalError, pero abrir y cerrar conexiones como esta es generalmente un No-No, debido a la sobrecarga de rendimiento.
  • No use subprocesos secundarios. Si la tarea de una vez por segundo es razonablemente liviana, podría salirse con la suya haciendo la búsqueda y almacenamiento, y luego durmiendo hasta el momento adecuado. Esto no es deseable, ya que las operaciones de recuperación y almacenamiento podrían tardar más de 1 segundo y perderá el beneficio de los recursos multiplexados que tiene con un enfoque de subprocesos múltiples.
James Brady
fuente
0

Necesita diseñar la simultaneidad para su programa. SQLite tiene limitaciones claras y debe obedecerlas, consulte las preguntas frecuentes (también la siguiente pregunta).

iny
fuente
0

Scrapy parece una posible respuesta a mi pregunta. Su página de inicio describe mi tarea exacta. (Aunque todavía no estoy seguro de qué tan estable es el código).

RexE
fuente
0

Echaría un vistazo al módulo Python y_serial para la persistencia de datos: http://yserial.sourceforge.net

que maneja problemas de interbloqueo que rodean una sola base de datos SQLite. Si la demanda de simultaneidad aumenta, se puede configurar fácilmente la clase Farm de muchas bases de datos para difundir la carga en el tiempo estocástico.

Espero que esto ayude a su proyecto ... debería ser lo suficientemente simple de implementar en 10 minutos.

código43
fuente
0

No pude encontrar ningún punto de referencia en ninguna de las respuestas anteriores, así que escribí una prueba para comparar todo.

Probé 3 enfoques

  1. Leer y escribir secuencialmente desde la base de datos SQLite
  2. Usando un ThreadPoolExecutor para leer / escribir
  3. Usando un ProcessPoolExecutor para leer / escribir

Los resultados y conclusiones del índice de referencia son los siguientes

  1. Las lecturas / escrituras secuenciales funcionan mejor
  2. Si debe procesar en paralelo, use ProcessPoolExecutor para leer en paralelo
  3. No realice ninguna escritura utilizando el ThreadPoolExecutor o el ProcessPoolExecutor, ya que se encontrará con errores bloqueados en la base de datos y tendrá que volver a intentar insertar el fragmento nuevamente.

Puede encontrar el código y la solución completa para los puntos de referencia en mi respuesta SO AQUÍ ¡ Espero que ayude!

PirateApp
fuente
-1

La razón más probable por la que obtiene errores con bases de datos bloqueadas es que debe emitir

conn.commit()

después de terminar una operación de base de datos. Si no lo hace, su base de datos se bloqueará contra escritura y permanecerá así. Los otros subprocesos que están esperando para escribir expirarán después de un tiempo (el valor predeterminado es de 5 segundos, consulte http://docs.python.org/2/library/sqlite3.html#sqlite3.connect para obtener detalles sobre eso) .

Un ejemplo de inserción correcta y concurrente sería este:

import threading, sqlite3
class InsertionThread(threading.Thread):

    def __init__(self, number):
        super(InsertionThread, self).__init__()
        self.number = number

    def run(self):
        conn = sqlite3.connect('yourdb.db', timeout=5)
        conn.execute('CREATE TABLE IF NOT EXISTS threadcount (threadnum, count);')
        conn.commit()

        for i in range(1000):
            conn.execute("INSERT INTO threadcount VALUES (?, ?);", (self.number, i))
            conn.commit()

# create as many of these as you wish
# but be careful to set the timeout value appropriately: thread switching in
# python takes some time
for i in range(2):
    t = InsertionThread(i)
    t.start()

Si le gusta SQLite, o tiene otras herramientas que funcionan con bases de datos SQLite, o desea reemplazar archivos CSV con archivos SQLite db, o debe hacer algo raro como IPC entre plataformas, entonces SQLite es una gran herramienta y muy adecuada para el propósito. ¡No se deje presionar para usar una solución diferente si no se siente bien!

shezi
fuente