Cuando usarlos
Si necesita más de dos puntos para comunicarse, use a Queue()
.
Si necesita un rendimiento absoluto, a Pipe()
es mucho más rápido porque Queue()
está construido encima Pipe()
.
Benchmarking de rendimiento
Supongamos que desea generar dos procesos y enviar mensajes entre ellos lo más rápido posible. Estos son los resultados de una carrera de resistencia entre pruebas similares usando Pipe()
y Queue()
... Esto está en un ThinkpadT61 con Ubuntu 11.10 y Python 2.7.2.
Para su información, arrojé resultados JoinableQueue()
como un bono; JoinableQueue()
da cuenta de las tareas cuando queue.task_done()
se llama (ni siquiera sabe acerca de la tarea específica, solo cuenta las tareas no finalizadas en la cola), por lo que queue.join()
sabe que el trabajo ha terminado.
El código para cada uno en la parte inferior de esta respuesta ...
mpenning@mpenning-T61:~$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
mpenning@mpenning-T61:~$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
mpnening@mpenning-T61:~$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
mpenning@mpenning-T61:~$
En resumen, Pipe()
es aproximadamente tres veces más rápido que a Queue()
. Ni siquiera pienses en eso a JoinableQueue()
menos que realmente tengas los beneficios.
BONO MATERIAL 2
El multiprocesamiento introduce cambios sutiles en el flujo de información que dificultan la depuración a menos que conozca algunos atajos. Por ejemplo, es posible que tenga un script que funcione bien cuando indexa a través de un diccionario en muchas condiciones, pero con poca frecuencia falla con ciertas entradas.
Normalmente obtenemos pistas sobre la falla cuando todo el proceso de Python falla; sin embargo, no obtendrá huellas de fallas no solicitadas impresas en la consola si la función de multiprocesamiento falla. Rastrear bloqueos de multiprocesamiento desconocidos es difícil sin una pista de lo que bloqueó el proceso.
La forma más sencilla que he encontrado para rastrear la información de bloqueo de multiprocesamiento es envolver toda la función de multiprocesamiento en un try
/ except
y usar traceback.print_exc()
:
import traceback
def run(self, args):
try:
# Insert stuff to be multiprocessed here
return args[0]['that']
except:
print "FATAL: reader({0}) exited while multiprocessing".format(args)
traceback.print_exc()
Ahora, cuando encuentras un bloqueo, ves algo como:
FATAL: reader([{'crash': 'this'}]) exited while multiprocessing
Traceback (most recent call last):
File "foo.py", line 19, in __init__
self.run(args)
File "foo.py", line 46, in run
KeyError: 'that'
Código fuente:
"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time
def reader_proc(pipe):
## Read from the pipe; this will be spawned as a separate Process
p_output, p_input = pipe
p_input.close() # We are only reading
while True:
msg = p_output.recv() # Read from the output pipe and do nothing
if msg=='DONE':
break
def writer(count, p_input):
for ii in xrange(0, count):
p_input.send(ii) # Write 'count' numbers into the input pipe
p_input.send('DONE')
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
# Pipes are unidirectional with two endpoints: p_input ------> p_output
p_output, p_input = Pipe() # writer() writes to p_input from _this_ process
reader_p = Process(target=reader_proc, args=((p_output, p_input),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
p_output.close() # We no longer need this part of the Pipe()
_start = time.time()
writer(count, p_input) # Send a lot of stuff to reader_proc()
p_input.close()
reader_p.join()
print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
(time.time() - _start)))
"""
multi_queue.py
"""
from multiprocessing import Process, Queue
import time
import sys
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
## Write to the queue
for ii in range(0, count):
queue.put(ii) # Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
pqueue = Queue() # writer() writes to pqueue from _this_ process
for count in [10**4, 10**5, 10**6]:
### reader_proc() reads from pqueue as a separate process
reader_p = Process(target=reader_proc, args=((pqueue),))
reader_p.daemon = True
reader_p.start() # Launch reader_proc() as a separate python process
_start = time.time()
writer(count, pqueue) # Send a lot of stuff to reader()
reader_p.join() # Wait for the reader to finish
print("Sending {0} numbers to Queue() took {1} seconds".format(count,
(time.time() - _start)))
"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time
def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get() # Read from the queue and do nothing
queue.task_done()
def writer(count, queue):
for ii in xrange(0, count):
queue.put(ii) # Write 'count' numbers into the queue
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process
# reader_proc() reads from jqueue as a different process...
reader_p = Process(target=reader_proc, args=((jqueue),))
reader_p.daemon = True
reader_p.start() # Launch the reader process
_start = time.time()
writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)
jqueue.join() # Wait for the reader to finish
print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count,
(time.time() - _start)))
Una característica adicional de
Queue()
eso vale la pena señalar es el hilo alimentador. Esta sección señala "Cuando un proceso coloca por primera vez un elemento en la cola, se inicia un hilo alimentador que transfiere objetos desde un búfer a la tubería". Se puede insertar un número infinito de elementos (o tamaño máximo)Queue()
sin llamadas alqueue.put()
bloqueo. Esto le permite almacenar múltiples elementos en unQueue()
, hasta que su programa esté listo para procesarlos.Pipe()
, por otro lado, tiene una cantidad finita de almacenamiento para los elementos que se han enviado a una conexión, pero que no se han recibido de la otra conexión. Después de que este almacenamiento se haya agotado, las llamadas aconnection.send()
se bloquearán hasta que haya espacio para escribir todo el elemento. Esto detendrá el hilo que escribe hasta que otro hilo se lea de la tubería.Connection
los objetos le dan acceso al descriptor de archivo subyacente. En los sistemas * nix, puede evitar que lasconnection.send()
llamadas se bloqueen utilizando laos.set_blocking()
función. Sin embargo, esto causará problemas si intenta enviar un solo elemento que no cabe en el archivo de la tubería. Las versiones recientes de Linux le permiten aumentar el tamaño de un archivo, pero el tamaño máximo permitido varía según las configuraciones del sistema. Por lo tanto, nunca debe confiar en losPipe()
datos del búfer. Llamadas aconnection.send
podría bloquearse hasta que los datos se lean de la tubería en otro lugar.En conclusión, Queue es una mejor opción que pipe cuando necesita almacenar datos en el búfer. Incluso cuando solo necesitas comunicarte entre dos puntos.
fuente