¿Cómo usar la cola de multiprocesamiento en Python?

94

Tengo muchos problemas para tratar de entender cómo funciona la cola de multiprocesamiento en Python y cómo implementarla. Digamos que tengo dos módulos de Python que acceden a datos desde un archivo compartido, llamemos a estos dos módulos un escritor y un lector. Mi plan es que tanto el lector como el escritor coloquen solicitudes en dos colas de multiprocesamiento separadas, y luego hacer que un tercer proceso muestre estas solicitudes en un bucle y las ejecute como tal.

Mi principal problema es que realmente no sé cómo implementar multiprocessing.queue correctamente, realmente no puede instanciar el objeto para cada proceso ya que serán colas separadas, ¿cómo se asegura de que todos los procesos se relacionen con una cola compartida (o en este caso, colas)

pinchazo
fuente
4
pasar las colas a cada clase de proceso como un parámetro cuando las instancia en el proceso padre.
Joel Cornett

Respuestas:

122

Mi principal problema es que realmente no sé cómo implementar multiprocessing.queue correctamente, realmente no puede instanciar el objeto para cada proceso ya que serán colas separadas, ¿cómo se asegura de que todos los procesos se relacionen con una cola compartida (o en este caso, colas)

Este es un ejemplo simple de un lector y un escritor que comparten una sola cola ... El escritor envía un montón de números enteros al lector; cuando el escritor se queda sin números, envía 'DONE', lo que le permite al lector saber que debe salir del ciclo de lectura.

from multiprocessing import Process, Queue
import time
import sys

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        if (msg == 'DONE'):
            break

def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':
    pqueue = Queue() # writer() writes to pqueue from _this_ process
    for count in [10**4, 10**5, 10**6]:             
        ### reader_proc() reads from pqueue as a separate process
        reader_p = Process(target=reader_proc, args=((pqueue),))
        reader_p.daemon = True
        reader_p.start()        # Launch reader_proc() as a separate python process

        _start = time.time()
        writer(count, pqueue)    # Send a lot of stuff to reader()
        reader_p.join()         # Wait for the reader to finish
        print("Sending {0} numbers to Queue() took {1} seconds".format(count, 
            (time.time() - _start)))
Mike Pennington
fuente
13
Gran ejemplo. Solo como un poco de información adicional para abordar la confusión del OP ... Este ejemplo muestra que una cola compartida debe originarse en el proceso maestro, que luego se pasa a todos sus subprocesos. Para que dos procesos completamente no relacionados compartan datos, deben comunicarse a través de algún dispositivo de red central o asociado (sockets, por ejemplo). Algo tiene que coordinar la información.
jdi
5
buen ejemplo ... también soy nuevo en este tema ... si tengo varios procesos que ejecutan la misma función de destino (con diferentes argumentos), cómo asegurarse de que no entren en conflicto al poner los datos en la cola ... es necesario bloquear ?
WYSIWYG
@bharat_iyengar De la documentación del módulo de multiprocesamiento, dice que Queue se implementa usando algunos bloqueos / semáforos. Entonces, cuando usa los métodos de cola get () y put (object), la cola se bloqueará si algún otro proceso / subproceso está intentando obtener o poner algo en la cola. Por lo tanto, no tiene que preocuparse por bloquearlo manualmente.
almel
1
Las condiciones de parada explícitas son mejores que las condiciones de parada implícitas
Mike Pennington
2
Qsize puede ir a cero si los lectores de la cola superan la tasa del escritor de la cola
Mike Pennington
8

en " from queue import Queue" no hay ningún módulo llamado queue, en su lugar multiprocessingdebe usarse. Por lo tanto, debería verse como " from multiprocessing import Queue"

Vaquero
fuente
11
Aunque multiprocessing.Queuelleva años, el uso es correcto. Lo normal Queue.Queuese usa para hilos de Python . Cuando intente utilizar Queue.Queuecon multiprocesamiento, se crearán copias del objeto Cola en cada proceso secundario y los procesos secundarios nunca se actualizarán. Básicamente, Queue.Queuefunciona con un objeto compartido global y multiprocessing.Queuefunciona con IPC. Ver: stackoverflow.com/questions/925100/…
Michael Guffre
5

Aquí hay un uso muy simple de multiprocessing.Queuey multiprocessing.Processque permite a las personas que llaman enviar un "evento" más argumentos a un proceso separado que envía el evento a un método "do_" en el proceso. (Python 3.4+)

import multiprocessing as mp
import collections

Msg = collections.namedtuple('Msg', ['event', 'args'])

class BaseProcess(mp.Process):
    """A process backed by an internal queue for simple one-way message passing.
    """
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.queue = mp.Queue()

    def send(self, event, *args):
        """Puts the event and args as a `Msg` on the queue
        """
       msg = Msg(event, args)
       self.queue.put(msg)

    def dispatch(self, msg):
        event, args = msg

        handler = getattr(self, "do_%s" % event, None)
        if not handler:
            raise NotImplementedError("Process has no handler for [%s]" % event)

        handler(*args)

    def run(self):
        while True:
            msg = self.queue.get()
            self.dispatch(msg)

Uso:

class MyProcess(BaseProcess):
    def do_helloworld(self, arg1, arg2):
        print(arg1, arg2)

if __name__ == "__main__":
    process = MyProcess()
    process.start()
    process.send('helloworld', 'hello', 'world')

El sendsucede en el proceso principal, la do_*que ocurre en el proceso hijo.

Dejé fuera cualquier manejo de excepciones que obviamente interrumpiría el ciclo de ejecución y saldría del proceso hijo. También puede personalizarlo anulando runpara controlar el bloqueo o cualquier otra cosa.

Esto realmente solo es útil en situaciones en las que tiene un solo proceso de trabajo, pero creo que es una respuesta relevante a esta pregunta para demostrar un escenario común con un poco más de orientación a objetos.

Joe Holloway
fuente
1
¡Excelente respuesta! Gracias. +50 :)
kmiklas
3

Eché un vistazo a varias respuestas en el desbordamiento de pila y en la web mientras intentaba configurar una forma de hacer multiprocesamiento utilizando colas para pasar grandes marcos de datos de pandas. Me pareció que cada respuesta estaba reiterando el mismo tipo de soluciones sin tener en cuenta la multitud de casos extremos que uno definitivamente encontrará al configurar cálculos como estos. El problema es que hay muchas cosas en juego al mismo tiempo. El número de tareas, el número de trabajadores, la duración de cada tarea y las posibles excepciones durante la ejecución de la tarea. Todos estos hacen que la sincronización sea complicada y la mayoría de las respuestas no abordan cómo puede hacerlo. Así que esta es mi opinión después de jugar durante unas horas, espero que sea lo suficientemente genérico para que la mayoría de la gente lo encuentre útil.

Algunas reflexiones antes de cualquier ejemplo de codificación. Dado que queue.Emptyo queue.qsize()cualquier otro método similar no es confiable para el control de flujo, cualquier código similar

while True:
    try:
        task = pending_queue.get_nowait()
    except queue.Empty:
        break

es falso. Esto matará al trabajador incluso si milisegundos más tarde aparece otra tarea en la cola. El trabajador no se recuperará y después de un tiempo TODOS los trabajadores desaparecerán ya que al azar encuentran la cola momentáneamente vacía. El resultado final será que la función principal de multiprocesamiento (la que tiene la combinación () en los procesos) regresará sin que se hayan completado todas las tareas. Agradable. Buena suerte depurando eso si tienes miles de tareas y faltan algunas.

El otro problema es el uso de valores centinela. Mucha gente ha sugerido agregar un valor centinela en la cola para marcar el final de la cola. Pero para señalarlo a quién exactamente? Si hay N trabajadores, asumiendo que N es el número de núcleos disponibles más o menos, entonces un solo valor centinela solo marcará el final de la cola a un trabajador. Todos los demás trabajadores se quedarán sentados esperando más trabajo cuando no quede ninguno. Los ejemplos típicos que he visto son

while True:
    task = pending_queue.get()
    if task == SOME_SENTINEL_VALUE:
        break

Un trabajador obtendrá el valor centinela mientras que el resto esperará indefinidamente. Ninguna publicación con la que me encontré mencionó que debe enviar el valor centinela a la cola AL MENOS tantas veces como trabajadores para que TODOS lo obtengan.

El otro problema es el manejo de excepciones durante la ejecución de la tarea. Nuevamente, estos deben ser capturados y manejados. Además, si tiene una completed_taskscola, debe contar de forma independiente y determinista cuántos elementos hay en la cola antes de decidir que el trabajo está terminado. De nuevo, depender del tamaño de las colas está destinado a fallar y arrojar resultados inesperados.

En el siguiente ejemplo, la par_proc()función recibirá una lista de tareas que incluye las funciones con las que estas tareas deben ejecutarse junto con los argumentos y valores nombrados.

import multiprocessing as mp
import dill as pickle
import queue
import time
import psutil

SENTINEL = None


def do_work(tasks_pending, tasks_completed):
    # Get the current worker's name
    worker_name = mp.current_process().name

    while True:
        try:
            task = tasks_pending.get_nowait()
        except queue.Empty:
            print(worker_name + ' found an empty queue. Sleeping for a while before checking again...')
            time.sleep(0.01)
        else:
            try:
                if task == SENTINEL:
                    print(worker_name + ' no more work left to be done. Exiting...')
                    break

                print(worker_name + ' received some work... ')
                time_start = time.perf_counter()
                work_func = pickle.loads(task['func'])
                result = work_func(**task['task'])
                tasks_completed.put({work_func.__name__: result})
                time_end = time.perf_counter() - time_start
                print(worker_name + ' done in {} seconds'.format(round(time_end, 5)))
            except Exception as e:
                print(worker_name + ' task failed. ' + str(e))
                tasks_completed.put({work_func.__name__: None})


def par_proc(job_list, num_cpus=None):

    # Get the number of cores
    if not num_cpus:
        num_cpus = psutil.cpu_count(logical=False)

    print('* Parallel processing')
    print('* Running on {} cores'.format(num_cpus))

    # Set-up the queues for sending and receiving data to/from the workers
    tasks_pending = mp.Queue()
    tasks_completed = mp.Queue()

    # Gather processes and results here
    processes = []
    results = []

    # Count tasks
    num_tasks = 0

    # Add the tasks to the queue
    for job in job_list:
        for task in job['tasks']:
            expanded_job = {}
            num_tasks = num_tasks + 1
            expanded_job.update({'func': pickle.dumps(job['func'])})
            expanded_job.update({'task': task})
            tasks_pending.put(expanded_job)

    # Use as many workers as there are cores (usually chokes the system so better use less)
    num_workers = num_cpus

    # We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
    # work left to be done.
    for c in range(num_workers):
        tasks_pending.put(SENTINEL)

    print('* Number of tasks: {}'.format(num_tasks))

    # Set-up and start the workers
    for c in range(num_workers):
        p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed))
        p.name = 'worker' + str(c)
        processes.append(p)
        p.start()

    # Gather the results
    completed_tasks_counter = 0
    while completed_tasks_counter < num_tasks:
        results.append(tasks_completed.get())
        completed_tasks_counter = completed_tasks_counter + 1

    for p in processes:
        p.join()

    return results

Y aquí hay una prueba para ejecutar el código anterior contra

def test_parallel_processing():
    def heavy_duty1(arg1, arg2, arg3):
        return arg1 + arg2 + arg3

    def heavy_duty2(arg1, arg2, arg3):
        return arg1 * arg2 * arg3

    task_list = [
        {'func': heavy_duty1, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
        {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
    ]

    results = par_proc(task_list)

    job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
    job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])

    assert job1 == 15
    assert job2 == 21

más otro con algunas excepciones

def test_parallel_processing_exceptions():
    def heavy_duty1_raises(arg1, arg2, arg3):
        raise ValueError('Exception raised')
        return arg1 + arg2 + arg3

    def heavy_duty2(arg1, arg2, arg3):
        return arg1 * arg2 * arg3

    task_list = [
        {'func': heavy_duty1_raises, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
        {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
    ]

    results = par_proc(task_list)

    job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
    job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])

    assert not job1
    assert job2 == 21

Espero que sea de ayuda.

Nick B.
fuente
2

Implementamos dos versiones de esto, una simple agrupación de múltiples subprocesos que puede ejecutar muchos tipos de invocables, lo que nos hace la vida mucho más fácil y la segunda versión que usa procesos , que es menos flexible en términos de invocables y requiere una llamada adicional a eneldo.

Establecer frozen_pool en true congelará la ejecución hasta que se llame a finish_pool_queue en cualquiera de las clases.

Versión del hilo:

'''
Created on Nov 4, 2019

@author: Kevin
'''
from threading import Lock, Thread
from Queue import Queue
import traceback
from helium.loaders.loader_retailers import print_info
from time import sleep
import signal
import os

class ThreadPool(object):
    def __init__(self, queue_threads, *args, **kwargs):
        self.frozen_pool = kwargs.get('frozen_pool', False)
        self.print_queue = kwargs.get('print_queue', True)
        self.pool_results = []
        self.lock = Lock()
        self.queue_threads = queue_threads
        self.queue = Queue()
        self.threads = []

        for i in range(self.queue_threads):
            t = Thread(target=self.make_pool_call)
            t.daemon = True
            t.start()
            self.threads.append(t)

    def make_pool_call(self):
        while True:
            if self.frozen_pool:
                #print '--> Queue is frozen'
                sleep(1)
                continue

            item = self.queue.get()
            if item is None:
                break

            call = item.get('call', None)
            args = item.get('args', [])
            kwargs = item.get('kwargs', {})
            keep_results = item.get('keep_results', False)

            try:
                result = call(*args, **kwargs)

                if keep_results:
                    self.lock.acquire()
                    self.pool_results.append((item, result))
                    self.lock.release()

            except Exception as e:
                self.lock.acquire()
                print e
                traceback.print_exc()
                self.lock.release()
                os.kill(os.getpid(), signal.SIGUSR1)

            self.queue.task_done()

    def finish_pool_queue(self):
        self.frozen_pool = False

        while self.queue.unfinished_tasks > 0:
            if self.print_queue:
                print_info('--> Thread pool... %s' % self.queue.unfinished_tasks)
            sleep(5)

        self.queue.join()

        for i in range(self.queue_threads):
            self.queue.put(None)

        for t in self.threads:
            t.join()

        del self.threads[:]

    def get_pool_results(self):
        return self.pool_results

    def clear_pool_results(self):
        del self.pool_results[:]

Versión de proceso:

  '''
Created on Nov 4, 2019

@author: Kevin
'''
import traceback
from helium.loaders.loader_retailers import print_info
from time import sleep
import signal
import os
from multiprocessing import Queue, Process, Value, Array, JoinableQueue, Lock,\
    RawArray, Manager
from dill import dill
import ctypes
from helium.misc.utils import ignore_exception
from mem_top import mem_top
import gc

class ProcessPool(object):
    def __init__(self, queue_processes, *args, **kwargs):
        self.frozen_pool = Value(ctypes.c_bool, kwargs.get('frozen_pool', False))
        self.print_queue = kwargs.get('print_queue', True)
        self.manager = Manager()
        self.pool_results = self.manager.list()
        self.queue_processes = queue_processes
        self.queue = JoinableQueue()
        self.processes = []

        for i in range(self.queue_processes):
            p = Process(target=self.make_pool_call)
            p.start()
            self.processes.append(p)

        print 'Processes', self.queue_processes

    def make_pool_call(self):
        while True:
            if self.frozen_pool.value:
                sleep(1)
                continue

            item_pickled = self.queue.get()

            if item_pickled is None:
                #print '--> Ending'
                self.queue.task_done()
                break

            item = dill.loads(item_pickled)

            call = item.get('call', None)
            args = item.get('args', [])
            kwargs = item.get('kwargs', {})
            keep_results = item.get('keep_results', False)

            try:
                result = call(*args, **kwargs)

                if keep_results:
                    self.pool_results.append(dill.dumps((item, result)))
                else:
                    del call, args, kwargs, keep_results, item, result

            except Exception as e:
                print e
                traceback.print_exc()
                os.kill(os.getpid(), signal.SIGUSR1)

            self.queue.task_done()

    def finish_pool_queue(self, callable=None):
        self.frozen_pool.value = False

        while self.queue._unfinished_tasks.get_value() > 0:
            if self.print_queue:
                print_info('--> Process pool... %s' % (self.queue._unfinished_tasks.get_value()))

            if callable:
                callable()

            sleep(5)

        for i in range(self.queue_processes):
            self.queue.put(None)

        self.queue.join()
        self.queue.close()

        for p in self.processes:
            with ignore_exception: p.join(10)
            with ignore_exception: p.terminate()

        with ignore_exception: del self.processes[:]

    def get_pool_results(self):
        return self.pool_results

    def clear_pool_results(self):
        del self.pool_results[:]
def test(eg):
        print 'EG', eg

Llame con:

tp = ThreadPool(queue_threads=2)
tp.queue.put({'call': test, 'args': [random.randint(0, 100)]})
tp.finish_pool_queue()

o

pp = ProcessPool(queue_processes=2)
pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]}))
pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]}))
pp.finish_pool_queue()
Kevin Parker
fuente
0

Acabo de hacer un ejemplo simple y general para demostrar el paso de un mensaje a través de una cola entre 2 programas independientes. No responde directamente a la pregunta del OP, pero debería ser lo suficientemente claro como para indicar el concepto.

Servidor:

multiprocessing-queue-manager-server.py

import asyncio
import concurrent.futures
import multiprocessing
import multiprocessing.managers
import queue
import sys
import threading
from typing import Any, AnyStr, Dict, Union


class QueueManager(multiprocessing.managers.BaseManager):

    def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
        pass


def get_queue(ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
    global q

    if not ident in q:
        q[ident] = multiprocessing.Queue()

    return q[ident]


q: Dict[Union[AnyStr, int, type(None)], multiprocessing.Queue] = dict()
delattr(QueueManager, 'get_queue')


def init_queue_manager_server():
    if not hasattr(QueueManager, 'get_queue'):
        QueueManager.register('get_queue', get_queue)


def serve(no: int, term_ev: threading.Event):
    manager: QueueManager
    with QueueManager(authkey=QueueManager.__name__.encode()) as manager:
        print(f"Server address {no}: {manager.address}")

        while not term_ev.is_set():
            try:
                item: Any = manager.get_queue().get(timeout=0.1)
                print(f"Client {no}: {item} from {manager.address}")
            except queue.Empty:
                continue


async def main(n: int):
    init_queue_manager_server()
    term_ev: threading.Event = threading.Event()
    executor: concurrent.futures.ThreadPoolExecutor = concurrent.futures.ThreadPoolExecutor()

    i: int
    for i in range(n):
        asyncio.ensure_future(asyncio.get_running_loop().run_in_executor(executor, serve, i, term_ev))

    # Gracefully shut down
    try:
        await asyncio.get_running_loop().create_future()
    except asyncio.CancelledError:
        term_ev.set()
        executor.shutdown()
        raise


if __name__ == '__main__':
    asyncio.run(main(int(sys.argv[1])))

Cliente:

multiprocessing-queue-manager-client.py

import multiprocessing
import multiprocessing.managers
import os
import sys
from typing import AnyStr, Union


class QueueManager(multiprocessing.managers.BaseManager):

    def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
        pass


delattr(QueueManager, 'get_queue')


def init_queue_manager_client():
    if not hasattr(QueueManager, 'get_queue'):
        QueueManager.register('get_queue')


def main():
    init_queue_manager_client()

    manager: QueueManager = QueueManager(sys.argv[1], authkey=QueueManager.__name__.encode())
    manager.connect()

    message = f"A message from {os.getpid()}"
    print(f"Message to send: {message}")
    manager.get_queue().put(message)


if __name__ == '__main__':
    main()

Uso

Servidor:

$ python3 multiprocessing-queue-manager-server.py N

Nes un número entero que indica cuántos servidores se deben crear. Copie uno de los <server-address-N>resultados del servidor y conviértalo en el primer argumento de cada uno multiprocessing-queue-manager-client.py.

Cliente:

python3 multiprocessing-queue-manager-client.py <server-address-1>

Resultado

Servidor:

Client 1: <item> from <server-address-1>

Gist: https://gist.github.com/89062d639e40110c61c2f88018a8b0e5


UPD : Creé un paquete aquí .

Servidor:

import ipcq


with ipcq.QueueManagerServer(address=ipcq.Address.DEFAULT, authkey=ipcq.AuthKey.DEFAULT) as server:
    server.get_queue().get()

Cliente:

import ipcq


client = ipcq.QueueManagerClient(address=ipcq.Address.DEFAULT, authkey=ipcq.AuthKey.DEFAULT)
client.get_queue().put('a message')

ingrese la descripción de la imagen aquí

changyuheng
fuente