El teclado interrumpe con el grupo de multiprocesamiento de Python

136

¿Cómo puedo manejar los eventos de KeyboardInterrupt con los pools de multiprocesamiento de Python? Aquí hay un ejemplo simple:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

Cuando ejecuto el código anterior, KeyboardInterruptaparece cuando presiono ^C, pero el proceso simplemente se bloquea en ese punto y tengo que eliminarlo externamente.

Quiero poder presionar ^Cen cualquier momento y hacer que todos los procesos salgan con gracia.

Fragsworth
fuente
Resolví mi problema usando psutil, puedes ver la solución aquí: stackoverflow.com/questions/32160054/…
Tiago Albineli Motta

Respuestas:

137

Este es un error de Python. Al esperar una condición en threading.Condition.wait (), KeyboardInterrupt nunca se envía. Repro:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

La excepción KeyboardInterrupt no se entregará hasta que wait () regrese, y nunca regresa, por lo que la interrupción nunca ocurre. KeyboardInterrupt seguramente debería interrumpir una condición de espera.

Tenga en cuenta que esto no sucede si se especifica un tiempo de espera; cond.wait (1) recibirá la interrupción inmediatamente. Entonces, una solución es especificar un tiempo de espera. Para hacer eso, reemplace

    results = pool.map(slowly_square, range(40))

con

    results = pool.map_async(slowly_square, range(40)).get(9999999)

o similar.

Glenn Maynard
fuente
3
¿Está este error en el rastreador oficial de Python en alguna parte? Tengo problemas para encontrarlo, pero probablemente no estoy usando los mejores términos de búsqueda.
Joseph Garvin
18
Este error se ha archivado como [Issue 8296] [1]. [1]: bugs.python.org/issue8296
Andrey Vlasovskikh
1
Aquí hay un truco que corrige pool.imap () de la misma manera, haciendo posible Ctrl-C al iterar sobre imap. Capture la excepción y llame a pool.terminate () y su programa se cerrará. gist.github.com/626518
Alexander Ljungberg
66
Esto no soluciona las cosas. A veces obtengo el comportamiento esperado cuando presiono Control + C, otras veces no. No estoy seguro de por qué, pero parece que tal vez The KeyboardInterrupt es recibido por uno de los procesos al azar, y solo obtengo el comportamiento correcto si el proceso principal es el que lo detecta.
Ryan C. Thompson el
66
Esto no funciona para mí con Python 3.6.1 en Windows. Obtengo toneladas de rastros de pila y otra basura cuando hago Ctrl-C, es decir, lo mismo que sin esa solución. De hecho ninguna de las soluciones que he tratado de este hilo parecen funcionar ...
SZX
56

Por lo que he encontrado recientemente, la mejor solución es configurar los procesos de trabajo para ignorar SIGINT por completo y limitar todo el código de limpieza al proceso padre. Esto soluciona el problema para los procesos de trabajo inactivo y ocupado, y no requiere código de manejo de errores en sus procesos secundarios.

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

La explicación y el código de ejemplo completo se pueden encontrar en http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/ y http://github.com/jreese/multiprocessing-keyboardinterrupt respectivamente.

John Reese
fuente
44
Hola John. Su solución no logra lo mismo que mi, sí, lamentablemente complicada, solución. Se esconde detrás del time.sleep(10)proceso principal. Si tuviera que eliminar esa suspensión, o si espera hasta que el proceso intente unirse al grupo, lo que tiene que hacer para garantizar que se completen los trabajos, entonces todavía sufre el mismo problema que es el proceso principal. No reciba el KeyboardInterrupt mientras espera una joinoperación de sondeo .
bboe
En el caso de que usara este código en producción, time.sleep () formaba parte de un bucle que verificaría el estado de cada proceso secundario y luego reiniciaría ciertos procesos con retraso si fuera necesario. En lugar de join () que esperaría a que se completaran todos los procesos, los verificaría individualmente, asegurando que el proceso maestro permaneciera receptivo.
John Reese
2
Entonces, ¿fue más una espera ocupada (tal vez con pequeños descansos entre cheques) que se encuestó para completar el proceso a través de otro método en lugar de unirse? Si ese es el caso, quizás sería mejor incluir este código en la publicación de su blog, ya que puede garantizar que todos los trabajadores hayan completado antes de intentar unirse.
bboe
44
Esto no funciona Solo los niños reciben la señal. El padre nunca lo recibe, por lo que pool.terminate()nunca se ejecuta. Hacer que los niños ignoren la señal no logra nada. La respuesta de @ Glenn resuelve el problema.
Cerin
1
Mi versión de esto está en gist.github.com/admackin/003dd646e5fadee8b8d6 ; no llama .join()excepto en caso de interrupción: simplemente verifica manualmente el resultado del .apply_async()uso AsyncResult.ready()para ver si está listo, lo que significa que hemos terminado limpiamente.
Andy MacKinlay
29

Por algunas razones, solo las excepciones heredadas de la Exceptionclase base se manejan normalmente. Como solución alternativa, puede volver a subir su KeyboardInterruptcomo una Exceptioninstancia:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

Normalmente obtendría el siguiente resultado:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

Entonces, si golpeas ^C, obtendrás:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end
Andrey Vlasovskikh
fuente
2
Parece que esta no es una solución completa. Si KeyboardInterruptse llega a mientras multiprocessingrealiza su propio intercambio de datos IPC, entonces try..catchno se activará (obviamente).
Andrey Vlasovskikh
Podrías reemplazarlo raise KeyboardInterruptErrorpor a return. Solo tiene que asegurarse de que el proceso secundario finalice tan pronto como se reciba KeyboardInterrupt. El valor de retorno parece ser ignorado, mainaún así se recibe KeyboardInterrupt.
Bernhard
8

Por lo general, esta estructura simple funciona para Ctrl- Cen Pool:

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

Como se indicó en algunas publicaciones similares:

Capture la interrupción del teclado en Python sin try-except

igco
fuente
1
Esto debería hacerse también en cada uno de los procesos de trabajo, y aún puede fallar si se levanta KeyboardInterrupt mientras se inicializa la biblioteca de multiprocesamiento.
MarioVilas
7

La respuesta votada no aborda el problema central sino un efecto secundario similar.

Jesse Noller, el autor de la biblioteca de multiprocesamiento, explica cómo tratar correctamente CTRL + C cuando se usa multiprocessing.Poolen una publicación de blog anterior .

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()
noxdafox
fuente
Descubrí que ProcessPoolExecutor también tiene el mismo problema. La única solución que pude encontrar fue llamar os.setpgrp()desde dentro del futuro
portforwardpodcast
1
Claro, la única diferencia es que ProcessPoolExecutorno admite funciones de inicializador. En Unix, puede aprovechar la forkestrategia deshabilitando el sighandler en el proceso principal antes de crear el Pool y volver a habilitarlo después. En guijarro , silencio SIGINTen los procesos secundarios de forma predeterminada. No conozco la razón por la que no hacen lo mismo con los Python Pools. Al final, el usuario podría volver a configurar el SIGINTcontrolador en caso de que quiera lastimarse.
noxdafox
Esta solución parece evitar que Ctrl-C interrumpa también el proceso principal.
Paul Price
1
Acabo de probar en Python 3.5 y funciona, ¿qué versión de Python estás usando? Que sistema operativo
noxdafox
5

Parece que hay dos problemas que hacen excepciones mientras que el multiprocesamiento es molesto. El primero (señalado por Glenn) es que debe usar map_asynccon un tiempo de espera en lugar de mappara obtener una respuesta inmediata (es decir, no termine de procesar la lista completa). El segundo (señalado por Andrey) es que el multiprocesamiento no captura excepciones que no heredan de Exception(por ejemplo, SystemExit). Así que aquí está mi solución que trata con ambos:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results
Paul Price
fuente
1
No he notado ninguna penalización de rendimiento, pero en mi caso functiones bastante duradera (cientos de segundos).
Paul Price
En realidad, este ya no es el caso, al menos desde mi punto de vista y experiencia. Si detecta la excepción del teclado en los procesos secundarios individuales y la captura una vez más en el proceso principal, puede continuar usando mapy todo está bien. @Linux Cli Aikproporcionó una solución a continuación que produce este comportamiento. map_asyncNo siempre se desea usar si el hilo principal depende de los resultados de los procesos secundarios.
Código Doggo
4

Descubrí que, por el momento, la mejor solución es no usar la función multiprocessing.pool sino más bien rodar la funcionalidad de su propio pool. Proporcioné un ejemplo que demuestra el error con apply_async, así como un ejemplo que muestra cómo evitar el uso de la funcionalidad del grupo por completo.

http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/

bboe
fuente
Funciona de maravilla. Es una solución limpia y no una especie de pirateo (/ me piensa). Por cierto, el truco con .get (99999) según lo propuesto por otros perjudica mucho el rendimiento.
Walter
No he notado ninguna penalización de rendimiento por usar un tiempo de espera, aunque he estado usando 9999 en lugar de 999999. La excepción es cuando se genera una excepción que no hereda de la clase Excepción: entonces debe esperar hasta que se agote el tiempo de espera golpear. La solución a eso es atrapar todas las excepciones (ver mi solución).
Paul Price
1

Soy un novato en Python. Estaba buscando respuestas en todas partes y me topé con este y algunos otros blogs y videos de YouTube. He intentado copiar y pegar el código del autor anterior y reproducirlo en mi Python 2.7.13 en Windows 7 de 64 bits. Está cerca de lo que quiero lograr.

Hice que mi hijo procese para ignorar el ControlC y hacer que el proceso padre finalice. Parece que pasar por alto el proceso secundario me evita este problema.

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

La parte que comienza en pool.terminate()nunca parece ejecutarse.

Linux Cli Aik
fuente
¡También me di cuenta de esto también! Sinceramente, creo que esta es la mejor solución para un problema como este. La solución aceptada obliga map_asyncal usuario, lo que no me gusta particularmente. En muchas situaciones, como la mía, el hilo principal debe esperar a que finalicen los procesos individuales. ¡Esta es una de las razones por las que mapexiste!
Código Doggo
1

Puede intentar usar el método apply_async de un objeto Pool, como este:

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

Salida:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

Una ventaja de este método es que los resultados procesados ​​antes de la interrupción serán devueltos en el diccionario de resultados:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
bparker856
fuente
Ejemplo glorioso y completo
eMTy
-5

Por extraño que parezca, también tienes que ocuparte KeyboardInterruptde los niños. Hubiera esperado que esto funcionara tal como está escrito ... intente cambiar slowly_squarea:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

Eso debería funcionar como esperabas.

D.Shawley
fuente
1
Intenté esto, y en realidad no termina todo el conjunto de trabajos. Termina los trabajos que se ejecutan actualmente, pero el script aún asigna los trabajos restantes en la llamada pool.map como si todo fuera normal.
Fragsworth
Esto está bien, pero puede perder la noción de los errores que ocurren. devolver el error con un seguimiento de pila podría funcionar para que el proceso principal pueda decir que ocurrió un error, pero aún así no se cierra inmediatamente cuando ocurre el error.
mehtunguh