La documentación del multiprocessing
módulo muestra cómo pasar una cola a un proceso con el que se inició multiprocessing.Process
. Pero, ¿cómo puedo compartir una cola con los procesos de trabajo asincrónicos con los que se inició apply_async
? No necesito una unión dinámica ni nada más, solo una forma para que los trabajadores informen (repetidamente) de sus resultados a la base.
import multiprocessing
def worker(name, que):
que.put("%d is done" % name)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
q = multiprocessing.Queue()
workers = pool.apply_async(worker, (33, q))
Esta falla con:
RuntimeError: Queue objects should only be shared between processes through inheritance
. Entiendo lo que esto significa y entiendo el consejo de heredar en lugar de requerir decapado / despegado (y todas las restricciones especiales de Windows). Pero, ¿cómo no me pase la cola de manera que las obras? No puedo encontrar un ejemplo y probé varias alternativas que fallaron de varias maneras. ¿Ayuda por favor?
queue.Queue()
no es adecuada para esto?queue.Queue
fue construido para subprocesos, usando bloqueos en memoria. En un entorno multiproceso, cada subproceso obtendría su propia copia de unaqueue.Queue()
instancia en su propio espacio de memoria, ya que los subprocesos no comparten memoria (en su mayoría).multiprocessing.Pool
ya tiene una cola de resultados compartida, no es necesario involucrar adicionalmente aManager.Queue
.Manager.Queue
es unaqueue.Queue
(cola de subprocesos múltiples) debajo del capó, ubicada en un proceso de servidor separado y expuesta a través de proxies. Esto agrega una sobrecarga adicional en comparación con la cola interna de Pool. Contrariamente a confiar en el manejo de resultados nativo de Pool,Manager.Queue
no se garantiza que los resultados en el también estén ordenados.Los procesos de trabajo no se inician con
.apply_async()
, esto ya sucede cuando crea una instanciaPool
. Lo que se inicia cuando llamaspool.apply_async()
es un nuevo "trabajo". Los procesos de trabajo de Pool ejecutan lamultiprocessing.pool.worker
función bajo el capó. Esta función se encarga de procesar las nuevas "tareas" transferidas a través del interno de PoolPool._inqueue
y de enviar los resultados al padre a través delPool._outqueue
. Su especificadofunc
se ejecutará dentromultiprocessing.pool.worker
.func
solo tiene que hacerreturn
algo y el resultado se enviará automáticamente al padre..apply_async()
Inmediatamente (asincrónicamente) devuelve unAsyncResult
objeto (alias deApplyResult
). Necesita llamar.get()
(está bloqueando) en ese objeto para recibir el resultado real. Otra opción sería registrar una función de devolución de llamada , que se activa tan pronto como el resultado esté listo.Salida de ejemplo:
Nota: Especificar el
timeout
parámetro -para.get()
no detendrá el procesamiento real de la tarea dentro del trabajador, solo desbloquea al padre en espera al generar unmultiprocessing.TimeoutError
.fuente
error_callback
parámetro -paraapply_async
, por lo que no ha cambiado mucho desde entonces.