Tengo muchos problemas para tratar de entender cómo funciona la cola de multiprocesamiento en Python y cómo implementarla. Digamos que tengo dos módulos de Python que acceden a datos desde un archivo compartido, llamemos a estos dos módulos un escritor y un lector. Mi plan es que tanto el lector como el escritor coloquen solicitudes en dos colas de multiprocesamiento separadas, y luego hacer que un tercer proceso muestre estas solicitudes en un bucle y las ejecute como tal.
Mi principal problema es que realmente no sé cómo implementar multiprocessing.queue correctamente, realmente no puede instanciar el objeto para cada proceso ya que serán colas separadas, ¿cómo se asegura de que todos los procesos se relacionen con una cola compartida (o en este caso, colas)
fuente
Respuestas:
Este es un ejemplo simple de un lector y un escritor que comparten una sola cola ... El escritor envía un montón de números enteros al lector; cuando el escritor se queda sin números, envía 'DONE', lo que le permite al lector saber que debe salir del ciclo de lectura.
from multiprocessing import Process, Queue import time import sys def reader_proc(queue): ## Read from the queue; this will be spawned as a separate Process while True: msg = queue.get() # Read from the queue and do nothing if (msg == 'DONE'): break def writer(count, queue): ## Write to the queue for ii in range(0, count): queue.put(ii) # Write 'count' numbers into the queue queue.put('DONE') if __name__=='__main__': pqueue = Queue() # writer() writes to pqueue from _this_ process for count in [10**4, 10**5, 10**6]: ### reader_proc() reads from pqueue as a separate process reader_p = Process(target=reader_proc, args=((pqueue),)) reader_p.daemon = True reader_p.start() # Launch reader_proc() as a separate python process _start = time.time() writer(count, pqueue) # Send a lot of stuff to reader() reader_p.join() # Wait for the reader to finish print("Sending {0} numbers to Queue() took {1} seconds".format(count, (time.time() - _start)))
fuente
en "
from queue import Queue
" no hay ningún módulo llamadoqueue
, en su lugarmultiprocessing
debe usarse. Por lo tanto, debería verse como "from multiprocessing import Queue
"fuente
multiprocessing.Queue
lleva años, el uso es correcto. Lo normalQueue.Queue
se usa para hilos de Python . Cuando intente utilizarQueue.Queue
con multiprocesamiento, se crearán copias del objeto Cola en cada proceso secundario y los procesos secundarios nunca se actualizarán. Básicamente,Queue.Queue
funciona con un objeto compartido global ymultiprocessing.Queue
funciona con IPC. Ver: stackoverflow.com/questions/925100/…Aquí hay un uso muy simple de
multiprocessing.Queue
ymultiprocessing.Process
que permite a las personas que llaman enviar un "evento" más argumentos a un proceso separado que envía el evento a un método "do_" en el proceso. (Python 3.4+)import multiprocessing as mp import collections Msg = collections.namedtuple('Msg', ['event', 'args']) class BaseProcess(mp.Process): """A process backed by an internal queue for simple one-way message passing. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.queue = mp.Queue() def send(self, event, *args): """Puts the event and args as a `Msg` on the queue """ msg = Msg(event, args) self.queue.put(msg) def dispatch(self, msg): event, args = msg handler = getattr(self, "do_%s" % event, None) if not handler: raise NotImplementedError("Process has no handler for [%s]" % event) handler(*args) def run(self): while True: msg = self.queue.get() self.dispatch(msg)
Uso:
class MyProcess(BaseProcess): def do_helloworld(self, arg1, arg2): print(arg1, arg2) if __name__ == "__main__": process = MyProcess() process.start() process.send('helloworld', 'hello', 'world')
El
send
sucede en el proceso principal, lado_*
que ocurre en el proceso hijo.Dejé fuera cualquier manejo de excepciones que obviamente interrumpiría el ciclo de ejecución y saldría del proceso hijo. También puede personalizarlo anulando
run
para controlar el bloqueo o cualquier otra cosa.Esto realmente solo es útil en situaciones en las que tiene un solo proceso de trabajo, pero creo que es una respuesta relevante a esta pregunta para demostrar un escenario común con un poco más de orientación a objetos.
fuente
Eché un vistazo a varias respuestas en el desbordamiento de pila y en la web mientras intentaba configurar una forma de hacer multiprocesamiento utilizando colas para pasar grandes marcos de datos de pandas. Me pareció que cada respuesta estaba reiterando el mismo tipo de soluciones sin tener en cuenta la multitud de casos extremos que uno definitivamente encontrará al configurar cálculos como estos. El problema es que hay muchas cosas en juego al mismo tiempo. El número de tareas, el número de trabajadores, la duración de cada tarea y las posibles excepciones durante la ejecución de la tarea. Todos estos hacen que la sincronización sea complicada y la mayoría de las respuestas no abordan cómo puede hacerlo. Así que esta es mi opinión después de jugar durante unas horas, espero que sea lo suficientemente genérico para que la mayoría de la gente lo encuentre útil.
Algunas reflexiones antes de cualquier ejemplo de codificación. Dado que
queue.Empty
oqueue.qsize()
cualquier otro método similar no es confiable para el control de flujo, cualquier código similarwhile True: try: task = pending_queue.get_nowait() except queue.Empty: break
es falso. Esto matará al trabajador incluso si milisegundos más tarde aparece otra tarea en la cola. El trabajador no se recuperará y después de un tiempo TODOS los trabajadores desaparecerán ya que al azar encuentran la cola momentáneamente vacía. El resultado final será que la función principal de multiprocesamiento (la que tiene la combinación () en los procesos) regresará sin que se hayan completado todas las tareas. Agradable. Buena suerte depurando eso si tienes miles de tareas y faltan algunas.
El otro problema es el uso de valores centinela. Mucha gente ha sugerido agregar un valor centinela en la cola para marcar el final de la cola. Pero para señalarlo a quién exactamente? Si hay N trabajadores, asumiendo que N es el número de núcleos disponibles más o menos, entonces un solo valor centinela solo marcará el final de la cola a un trabajador. Todos los demás trabajadores se quedarán sentados esperando más trabajo cuando no quede ninguno. Los ejemplos típicos que he visto son
while True: task = pending_queue.get() if task == SOME_SENTINEL_VALUE: break
Un trabajador obtendrá el valor centinela mientras que el resto esperará indefinidamente. Ninguna publicación con la que me encontré mencionó que debe enviar el valor centinela a la cola AL MENOS tantas veces como trabajadores para que TODOS lo obtengan.
El otro problema es el manejo de excepciones durante la ejecución de la tarea. Nuevamente, estos deben ser capturados y manejados. Además, si tiene una
completed_tasks
cola, debe contar de forma independiente y determinista cuántos elementos hay en la cola antes de decidir que el trabajo está terminado. De nuevo, depender del tamaño de las colas está destinado a fallar y arrojar resultados inesperados.En el siguiente ejemplo, la
par_proc()
función recibirá una lista de tareas que incluye las funciones con las que estas tareas deben ejecutarse junto con los argumentos y valores nombrados.import multiprocessing as mp import dill as pickle import queue import time import psutil SENTINEL = None def do_work(tasks_pending, tasks_completed): # Get the current worker's name worker_name = mp.current_process().name while True: try: task = tasks_pending.get_nowait() except queue.Empty: print(worker_name + ' found an empty queue. Sleeping for a while before checking again...') time.sleep(0.01) else: try: if task == SENTINEL: print(worker_name + ' no more work left to be done. Exiting...') break print(worker_name + ' received some work... ') time_start = time.perf_counter() work_func = pickle.loads(task['func']) result = work_func(**task['task']) tasks_completed.put({work_func.__name__: result}) time_end = time.perf_counter() - time_start print(worker_name + ' done in {} seconds'.format(round(time_end, 5))) except Exception as e: print(worker_name + ' task failed. ' + str(e)) tasks_completed.put({work_func.__name__: None}) def par_proc(job_list, num_cpus=None): # Get the number of cores if not num_cpus: num_cpus = psutil.cpu_count(logical=False) print('* Parallel processing') print('* Running on {} cores'.format(num_cpus)) # Set-up the queues for sending and receiving data to/from the workers tasks_pending = mp.Queue() tasks_completed = mp.Queue() # Gather processes and results here processes = [] results = [] # Count tasks num_tasks = 0 # Add the tasks to the queue for job in job_list: for task in job['tasks']: expanded_job = {} num_tasks = num_tasks + 1 expanded_job.update({'func': pickle.dumps(job['func'])}) expanded_job.update({'task': task}) tasks_pending.put(expanded_job) # Use as many workers as there are cores (usually chokes the system so better use less) num_workers = num_cpus # We need as many sentinels as there are worker processes so that ALL processes exit when there is no more # work left to be done. for c in range(num_workers): tasks_pending.put(SENTINEL) print('* Number of tasks: {}'.format(num_tasks)) # Set-up and start the workers for c in range(num_workers): p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed)) p.name = 'worker' + str(c) processes.append(p) p.start() # Gather the results completed_tasks_counter = 0 while completed_tasks_counter < num_tasks: results.append(tasks_completed.get()) completed_tasks_counter = completed_tasks_counter + 1 for p in processes: p.join() return results
Y aquí hay una prueba para ejecutar el código anterior contra
def test_parallel_processing(): def heavy_duty1(arg1, arg2, arg3): return arg1 + arg2 + arg3 def heavy_duty2(arg1, arg2, arg3): return arg1 * arg2 * arg3 task_list = [ {'func': heavy_duty1, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, ] results = par_proc(task_list) job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())]) job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())]) assert job1 == 15 assert job2 == 21
más otro con algunas excepciones
def test_parallel_processing_exceptions(): def heavy_duty1_raises(arg1, arg2, arg3): raise ValueError('Exception raised') return arg1 + arg2 + arg3 def heavy_duty2(arg1, arg2, arg3): return arg1 * arg2 * arg3 task_list = [ {'func': heavy_duty1_raises, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, ] results = par_proc(task_list) job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())]) job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())]) assert not job1 assert job2 == 21
Espero que sea de ayuda.
fuente
Implementamos dos versiones de esto, una simple agrupación de múltiples subprocesos que puede ejecutar muchos tipos de invocables, lo que nos hace la vida mucho más fácil y la segunda versión que usa procesos , que es menos flexible en términos de invocables y requiere una llamada adicional a eneldo.
Establecer frozen_pool en true congelará la ejecución hasta que se llame a finish_pool_queue en cualquiera de las clases.
Versión del hilo:
''' Created on Nov 4, 2019 @author: Kevin ''' from threading import Lock, Thread from Queue import Queue import traceback from helium.loaders.loader_retailers import print_info from time import sleep import signal import os class ThreadPool(object): def __init__(self, queue_threads, *args, **kwargs): self.frozen_pool = kwargs.get('frozen_pool', False) self.print_queue = kwargs.get('print_queue', True) self.pool_results = [] self.lock = Lock() self.queue_threads = queue_threads self.queue = Queue() self.threads = [] for i in range(self.queue_threads): t = Thread(target=self.make_pool_call) t.daemon = True t.start() self.threads.append(t) def make_pool_call(self): while True: if self.frozen_pool: #print '--> Queue is frozen' sleep(1) continue item = self.queue.get() if item is None: break call = item.get('call', None) args = item.get('args', []) kwargs = item.get('kwargs', {}) keep_results = item.get('keep_results', False) try: result = call(*args, **kwargs) if keep_results: self.lock.acquire() self.pool_results.append((item, result)) self.lock.release() except Exception as e: self.lock.acquire() print e traceback.print_exc() self.lock.release() os.kill(os.getpid(), signal.SIGUSR1) self.queue.task_done() def finish_pool_queue(self): self.frozen_pool = False while self.queue.unfinished_tasks > 0: if self.print_queue: print_info('--> Thread pool... %s' % self.queue.unfinished_tasks) sleep(5) self.queue.join() for i in range(self.queue_threads): self.queue.put(None) for t in self.threads: t.join() del self.threads[:] def get_pool_results(self): return self.pool_results def clear_pool_results(self): del self.pool_results[:]
Versión de proceso:
''' Created on Nov 4, 2019 @author: Kevin ''' import traceback from helium.loaders.loader_retailers import print_info from time import sleep import signal import os from multiprocessing import Queue, Process, Value, Array, JoinableQueue, Lock,\ RawArray, Manager from dill import dill import ctypes from helium.misc.utils import ignore_exception from mem_top import mem_top import gc class ProcessPool(object): def __init__(self, queue_processes, *args, **kwargs): self.frozen_pool = Value(ctypes.c_bool, kwargs.get('frozen_pool', False)) self.print_queue = kwargs.get('print_queue', True) self.manager = Manager() self.pool_results = self.manager.list() self.queue_processes = queue_processes self.queue = JoinableQueue() self.processes = [] for i in range(self.queue_processes): p = Process(target=self.make_pool_call) p.start() self.processes.append(p) print 'Processes', self.queue_processes def make_pool_call(self): while True: if self.frozen_pool.value: sleep(1) continue item_pickled = self.queue.get() if item_pickled is None: #print '--> Ending' self.queue.task_done() break item = dill.loads(item_pickled) call = item.get('call', None) args = item.get('args', []) kwargs = item.get('kwargs', {}) keep_results = item.get('keep_results', False) try: result = call(*args, **kwargs) if keep_results: self.pool_results.append(dill.dumps((item, result))) else: del call, args, kwargs, keep_results, item, result except Exception as e: print e traceback.print_exc() os.kill(os.getpid(), signal.SIGUSR1) self.queue.task_done() def finish_pool_queue(self, callable=None): self.frozen_pool.value = False while self.queue._unfinished_tasks.get_value() > 0: if self.print_queue: print_info('--> Process pool... %s' % (self.queue._unfinished_tasks.get_value())) if callable: callable() sleep(5) for i in range(self.queue_processes): self.queue.put(None) self.queue.join() self.queue.close() for p in self.processes: with ignore_exception: p.join(10) with ignore_exception: p.terminate() with ignore_exception: del self.processes[:] def get_pool_results(self): return self.pool_results def clear_pool_results(self): del self.pool_results[:]
Llame con:
tp = ThreadPool(queue_threads=2) tp.queue.put({'call': test, 'args': [random.randint(0, 100)]}) tp.finish_pool_queue()
o
pp = ProcessPool(queue_processes=2) pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]})) pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]})) pp.finish_pool_queue()
fuente
Acabo de hacer un ejemplo simple y general para demostrar el paso de un mensaje a través de una cola entre 2 programas independientes. No responde directamente a la pregunta del OP, pero debería ser lo suficientemente claro como para indicar el concepto.
Servidor:
multiprocessing-queue-manager-server.py
import asyncio import concurrent.futures import multiprocessing import multiprocessing.managers import queue import sys import threading from typing import Any, AnyStr, Dict, Union class QueueManager(multiprocessing.managers.BaseManager): def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue: pass def get_queue(ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue: global q if not ident in q: q[ident] = multiprocessing.Queue() return q[ident] q: Dict[Union[AnyStr, int, type(None)], multiprocessing.Queue] = dict() delattr(QueueManager, 'get_queue') def init_queue_manager_server(): if not hasattr(QueueManager, 'get_queue'): QueueManager.register('get_queue', get_queue) def serve(no: int, term_ev: threading.Event): manager: QueueManager with QueueManager(authkey=QueueManager.__name__.encode()) as manager: print(f"Server address {no}: {manager.address}") while not term_ev.is_set(): try: item: Any = manager.get_queue().get(timeout=0.1) print(f"Client {no}: {item} from {manager.address}") except queue.Empty: continue async def main(n: int): init_queue_manager_server() term_ev: threading.Event = threading.Event() executor: concurrent.futures.ThreadPoolExecutor = concurrent.futures.ThreadPoolExecutor() i: int for i in range(n): asyncio.ensure_future(asyncio.get_running_loop().run_in_executor(executor, serve, i, term_ev)) # Gracefully shut down try: await asyncio.get_running_loop().create_future() except asyncio.CancelledError: term_ev.set() executor.shutdown() raise if __name__ == '__main__': asyncio.run(main(int(sys.argv[1])))
Cliente:
multiprocessing-queue-manager-client.py
import multiprocessing import multiprocessing.managers import os import sys from typing import AnyStr, Union class QueueManager(multiprocessing.managers.BaseManager): def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue: pass delattr(QueueManager, 'get_queue') def init_queue_manager_client(): if not hasattr(QueueManager, 'get_queue'): QueueManager.register('get_queue') def main(): init_queue_manager_client() manager: QueueManager = QueueManager(sys.argv[1], authkey=QueueManager.__name__.encode()) manager.connect() message = f"A message from {os.getpid()}" print(f"Message to send: {message}") manager.get_queue().put(message) if __name__ == '__main__': main()
Uso
Servidor:
N
es un número entero que indica cuántos servidores se deben crear. Copie uno de los<server-address-N>
resultados del servidor y conviértalo en el primer argumento de cada unomultiprocessing-queue-manager-client.py
.Cliente:
python3 multiprocessing-queue-manager-client.py <server-address-1>
Resultado
Servidor:
Client 1: <item> from <server-address-1>
Gist: https://gist.github.com/89062d639e40110c61c2f88018a8b0e5
UPD : Creé un paquete aquí .
Servidor:
import ipcq with ipcq.QueueManagerServer(address=ipcq.Address.DEFAULT, authkey=ipcq.AuthKey.DEFAULT) as server: server.get_queue().get()
Cliente:
import ipcq client = ipcq.QueueManagerClient(address=ipcq.Address.DEFAULT, authkey=ipcq.AuthKey.DEFAULT) client.get_queue().put('a message')
fuente