Tiempo de espera en una llamada de función

300

Estoy llamando a una función en Python que sé que puede detenerse y obligarme a reiniciar el script.

¿Cómo llamo a la función o en qué la envuelvo para que, si tarda más de 5 segundos, el script la cancela y hace algo más?

Teifion
fuente

Respuestas:

227

Puede usar el paquete de señal si está ejecutando en UNIX:

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print("Forever is over!")
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print("sec")
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print(exc)
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

10 segundos después de la llamada alarm.alarm(10), se llama al controlador. Esto genera una excepción que puede interceptar del código normal de Python.

Este módulo no funciona bien con hilos (pero, ¿quién lo hace?)

Tenga en cuenta que, dado que activamos una excepción cuando se agota el tiempo de espera, puede terminar atrapada e ignorada dentro de la función, por ejemplo, una de esas funciones:

def loop_forever():
    while 1:
        print('sec')
        try:
            time.sleep(10)
        except:
            continue
piro
fuente
55
Yo uso Python 2.5.4. Existe dicho error: Traceback (última llamada más reciente): archivo "aa.py", línea 85, en func signal.signal (signal.SIGALRM, handler) AttributeError: el objeto 'módulo' no tiene atributo 'SIGALRM'
flypen
11
@flypen es porque signal.alarmy lo relacionado SIGALRMno está disponible en plataformas Windows.
Doble AA
2
Si hay muchos procesos, y cada uno llama signal.signal--- ¿funcionarán todos correctamente? ¿Cada signal.signalllamada no cancelará una "concurrente"?
Brownian
1
Advertencia para aquellos que deseen usar esto con una extensión C: el controlador de señal Python no se llamará hasta que la función C devuelva el control al intérprete de Python. Para este caso de uso, use la respuesta de ATOzTOA: stackoverflow.com/a/14924210/1286628
wkschwartz
13
Secundo la advertencia sobre hilos. signal.alarm solo funciona en el hilo principal. Traté de usar esto en las vistas de Django: falla inmediata con verborrea solo sobre el hilo principal.
JL Peyret
154

Puedes usar multiprocessing.Processpara hacer exactamente eso.

Código

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate
        p.terminate()
        p.join()
ATOzTOA
fuente
36
¿Cómo puedo obtener el valor de retorno del método de destino?
bad_keypoints
44
Esto no parece funcionar si la función llamada se atasca en un bloque de E / S.
sudo
44
@bad_keypoints Vea esta respuesta: stackoverflow.com/a/10415215/1384471 Básicamente, pasa una lista en la que pone la respuesta.
Peter
1
@sudo luego elimine el join(). eso hace que su número x de subprocesos concurrentes se ejecuten hasta que terminen su trabajo, o la cantidad definida en join(10). En caso de que tenga una E / S de bloqueo para 10 procesos, utilizando join (10) los ha configurado para que esperen todos ellos como máximo 10 por CADA proceso que ha comenzado. Utilice el indicador de daemon como este ejemplo stackoverflow.com/a/27420072/2480481 . Por supuesto, puede pasar la bandera daemon=Truedirectamente a la multiprocessing.Process()función.
m3nda
2
@ATOzTOA El problema con esta solución, al menos para mis propósitos, es que potencialmente no permite que las huellas de los niños se limpien después de sí mismos. De la documentación de la función de terminaciónterminate() ... Note that exit handlers and finally clauses, etc., will not be executed. Note that descendant processes of the process will not be terminated – they will simply become orphaned.
abalcerek
78

¿Cómo llamo a la función o en qué la envuelvo para que si tarda más de 5 segundos, el script la cancela?

Publiqué un esencia que resuelve esta pregunta / problema con un decorador y a threading.Timer. Aquí está con un desglose.

Importaciones y configuraciones para compatibilidad

Fue probado con Python 2 y 3. También debería funcionar en Unix / Linux y Windows.

Primero las importaciones. Estos intentan mantener el código consistente independientemente de la versión de Python:

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread

Usar código independiente de la versión:

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass

Ahora hemos importado nuestra funcionalidad de la biblioteca estándar.

exit_after decorador

A continuación, necesitamos una función para terminar el main()hilo secundario:

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt

Y aquí está el decorador en sí:

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer

Uso

¡Y aquí está el uso que responde directamente a su pregunta sobre salir después de 5 segundos !:

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')

Manifestación:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt

¡La segunda llamada a función no finalizará, en su lugar, el proceso debería salir con un rastreo!

KeyboardInterrupt no siempre detiene un hilo dormido

Tenga en cuenta que el sueño no siempre será interrumpido por una interrupción del teclado, en Python 2 en Windows, por ejemplo:

@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')

>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt

ni es probable que interrumpa el código que se ejecuta en extensiones a menos que lo compruebe explícitamente PyErr_CheckSignals(), vea Cython, Python y KeyboardInterrupt ignorado

En cualquier caso, evitaría dormir un hilo más de un segundo, eso es un eón en tiempo de procesador.

¿Cómo llamo a la función o en qué la envuelvo para que, si tarda más de 5 segundos, el script la cancela y hace algo más?

Para atraparlo y hacer otra cosa, puede atrapar el KeyboardInterrupt.

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else
Aaron Hall
fuente
Todavía no leí toda tu publicación, pero me preguntaba: ¿qué pasa si la descarga es 0? Eso se interpretaría como Falso en la declaración if debajo, ¿verdad?
Koenraad van Duin
2
¿Por qué tengo que llamar thread.interrupt_main(), por qué no puedo plantear directamente una excepción?
Anirban Nag 'tintinmj'
¿Alguna idea de terminar multiprocessing.connection.Clientcon esto? - Intentando resolver: stackoverflow.com/questions/57817955/…
wwii
51

Tengo una propuesta diferente que es una función pura (con la misma API que la sugerencia de subprocesos) y parece funcionar bien (según las sugerencias de este hilo)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result
Alex
fuente
3
También debe restaurar el controlador de señal original. Ver stackoverflow.com/questions/492519/…
Martin Konecny
99
Una nota más: el método de señal Unix solo funciona si lo está aplicando en el hilo principal. Aplicarlo en un subproceso arroja una excepción y no funcionará.
Martin Konecny
12
Esta no es la mejor solución porque solo funciona en Linux.
max
17
Max, no es cierto: funciona en cualquier Unix compatible con POSIX. Creo que tu comentario debería ser más preciso, no funciona en Windows.
Chris Johnson
66
Debe evitar establecer kwargs en un dict vacío. Un problema común de Python es que los argumentos predeterminados sobre las funciones son mutables. Para que ese diccionario se comparta en todas las llamadas a timeout. Es mucho mejor establecer el valor predeterminado Noney, en la primera línea de la función, agregar kwargs = kwargs or {}. Args está bien porque las tuplas no son mutables.
scottmrogowski
32

Me encontré con este hilo al buscar una llamada de tiempo de espera en las pruebas unitarias. No encontré nada simple en las respuestas o los paquetes de terceros, así que escribí el decorador a continuación, puede colocarlo en el código:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

Entonces es tan simple como agotar el tiempo de espera de una prueba o cualquier función que desee:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...
Rico
fuente
14
¡Tenga cuidado ya que esto no termina la función después de que se alcanza el tiempo de espera!
Sylvain
Tenga en cuenta que en Windows, esto genera un proceso completamente nuevo, que consumirá el tiempo de espera, quizás mucho si las dependencias tardan mucho tiempo en configurarse.
Aaron Hall
1
Sí, esto necesita algunos ajustes. Deja hilos para siempre.
sudo
2
IDK si esta es la mejor manera, pero puede intentar / atrapar Exceptiondentro de func_wrapper y hacer pool.close()después de la captura para asegurarse de que el hilo siempre muere después, pase lo que pase . Entonces puedes lanzar TimeoutErroro lo que quieras después. Parece funcionar para mi.
sudo
2
Esto es útil, pero una vez que lo he hecho muchas veces, me sale RuntimeError: can't start new thread. ¿Funcionará si lo ignoro o hay algo más que pueda hacer para solucionar esto? ¡Gracias por adelantado!
Benjie
20

El stopitpaquete, que se encuentra en pypi, parece manejar bien los tiempos de espera.

Me gusta el @stopit.threading_timeoutabledecorador, que agrega un timeoutparámetro a la función decorada, que hace lo que espera, detiene la función.

Compruébalo en pypi: https://pypi.python.org/pypi/stopit

egeland
fuente
1
¡Es muy práctico y seguro para hilos! Gracias y más uno! ¡Esta es la mejor opción que he encontrado hasta ahora e incluso mejor que la respuesta aceptada!
Yahya
La biblioteca afirma que algunas funciones no funcionan en Windows.
Stefan Simik
16

Hay muchas sugerencias, pero ninguna con futuros concurrentes, que creo que es la forma más legible de manejar esto.

from concurrent.futures import ProcessPoolExecutor

# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)

Súper simple de leer y mantener.

Hacemos un grupo, enviamos un solo proceso y luego esperamos hasta 5 segundos antes de generar un TimeoutError que pueda detectar y manejar según lo necesite.

Originario de python 3.2+ y con respaldo de 2.7 (futuros de instalación de pip).

Cambiar entre hilos y procesos es tan simple como reemplazarlo ProcessPoolExecutorporThreadPoolExecutor .

Si desea finalizar el proceso en el tiempo de espera, le sugiero que busque en Pebble .

Brian
fuente
2
¿Qué significa "Advertencia: esto no termina la función si se agota el tiempo de espera"?
Scott Stafford
55
@ScottStafford Los procesos / subprocesos no finalizan solo porque se ha generado un TimeoutError. Por lo tanto, el proceso o el hilo seguirán intentando ejecutarse hasta su finalización y no le devolverán automáticamente el control en su tiempo de espera.
Brian
¿Esto me permitiría guardar resultados intermedios en ese momento? por ejemplo, si tengo una función recursiva que configuré el tiempo de espera en 5, y en ese momento tengo resultados parciales, ¿cómo escribo la función para devolver los resultados parciales en el tiempo de espera?
SumNeuron
Estoy usando exactamente esto, sin embargo, tengo 1000 tareas, cada una se permite 5 segundos antes del tiempo de espera. Mi problema es que los núcleos se atascan en tareas que nunca terminan porque el tiempo de espera solo se aplica en el total de tareas, no en tareas individuales. concurrent.futures no proporciona una solución a este afaik.
Bastiaan
12

Excelente, fácil de usar y confiable decorador de tiempo de espera del proyecto PyPi ( https://pypi.org/project/timeout-decorator/ )

instalación :

pip install timeout-decorator

Uso :

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()
Gil
fuente
2
Agradezco la solución clara. Pero, ¿podría alguien explicar cómo funciona esta biblioteca, especialmente cuando se trata de subprocesos múltiples? Personalmente, temo usar un mecanismo desconocido para manejar hilos o señales.
wsysuper
@wsysuper the lib tiene 2 modos de operaciones: abrir un nuevo subproceso o un nuevo subproceso (que se supone que es seguro para subprocesos)
Gil
¡Esto funcionó muy bien para mí!
Florian Heigl hace
6

Soy el autor de wrapt_timeout_decorator

La mayoría de las soluciones presentadas aquí funcionan maravillosamente bajo Linux a primera vista, porque tenemos fork () y señales (), pero en Windows las cosas se ven un poco diferentes. Y cuando se trata de subprocesos en Linux, ya no puede usar señales.

Para generar un proceso en Windows, debe ser seleccionable, y muchas funciones decoradas o métodos de clase no lo son.

Por lo tanto, debe usar un mejor selector como eneldo y multiproceso (no encurtido y multiprocesamiento), es por eso que no puede usar ProcessPoolExecutor (o solo con funcionalidad limitada).

Para el tiempo de espera en sí mismo: debe definir qué significa el tiempo de espera, porque en Windows tomará un tiempo considerable (y no determinable) para generar el proceso. Esto puede ser complicado en tiempos de espera cortos. Supongamos que generar el proceso lleva aproximadamente 0,5 segundos (¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡!!! Si le das un tiempo de espera de 0.2 segundos, ¿qué debería pasar? ¿Debe la función agotar el tiempo de espera después de 0.5 + 0.2 segundos (así que deje que el método se ejecute durante 0.2 segundos)? ¿O debería agotar el tiempo del proceso llamado después de 0.2 segundos (en ese caso, la función decorada SIEMPRE expira, porque en ese tiempo ni siquiera se genera)?

También los decoradores anidados pueden ser desagradables y no se pueden usar señales en un subproceso. Si desea crear un decorador multiplataforma verdaderamente universal, todo esto debe tenerse en cuenta (y probarse).

Otros problemas son pasar excepciones a la persona que llama, así como problemas de registro (si se usa en la función decorada, el registro a archivos en otro proceso NO es compatible)

Traté de cubrir todos los casos extremos. Puede consultar el paquete wrapt_timeout_decorator, o al menos probar sus propias soluciones inspiradas en las pruebas unitarias utilizadas allí.

@Alexis Eggermont - desafortunadamente no tengo suficientes puntos para comentar - tal vez alguien más pueda notificarte - Creo que resolví tu problema de importación.

bitranox
fuente
3

timeout-decoratorno funciona en el sistema de Windows, ya que Windows no era compatible signal.

Si usa timeout-decorator en el sistema de Windows obtendrá lo siguiente

AttributeError: module 'signal' has no attribute 'SIGALRM'

Algunos sugirieron usar use_signals=Falsepero no funcionaron para mí.

El autor @bitranox creó el siguiente paquete:

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

Código de muestra:

import time
from wrapt_timeout_decorator import *

@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))

def main():
    mytest('starting')


if __name__ == '__main__':
    main()

Da la siguiente excepción:

TimeoutError: Function mytest timed out after 5 seconds
como si
fuente
Esto suena como una muy buena solución. Curiosamente, la línea from wrapt_timeout_decorator import * parece matar algunas de mis otras importaciones. Por ejemplo, obtengo ModuleNotFoundError: No module named 'google.appengine', pero no obtengo este error si no importo wrapt_timeout_decorator
Alexis Eggermont
@AlexisEggermont Estaba a punto de usar esto con un motor ... así que tengo mucha curiosidad si este error persistió.
PascalVKooten
2

Podemos usar señales para lo mismo. Creo que el siguiente ejemplo te será útil. Es muy simple en comparación con los hilos.

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"
Arkansas
fuente
1
Sería mejor elegir una excepción específica y capturarla solo. Desnudo try: ... except: ...siempre es una mala idea.
colmena
Estoy de acuerdo contigo hivert.
AR
Si bien entiendo la razón, como administrador de sistemas / integrador, no estoy de acuerdo: el código de Python es conocido por descuidar el manejo de errores, y manejar lo que esperas no es lo suficientemente bueno para un software de calidad. puedes manejar las 5 cosas que planeas Y una estrategia genérica para otras cosas. "Traceback, None" no es una estrategia, es un insulto.
Florian Heigl hace
2
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)
Hal Canary
fuente
77
Si bien este código puede responder la pregunta, proporcionar un contexto adicional sobre cómo y / o por qué resuelve el problema mejoraría el valor a largo plazo de la respuesta
Dan Cornilescu
1

Necesitaba interrupciones temporizadas anidables (que SIGALARM no puede hacer) que no se bloqueen con time.sleep (que el enfoque basado en hilos no puede hacer). Terminé copiando y modificando ligeramente el código desde aquí: http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

El código en sí:

#!/usr/bin/python

# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/


"""alarm.py: Permits multiple SIGALRM events to be queued.

Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""

import heapq
import signal
from time import time

__version__ = '$Revision: 2539 $'.split()[1]

alarmlist = []

__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))


class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_


class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass


def __clear_alarm():
    """Clear an existing alarm.

    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))


def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.

    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()


def alarm(sec, func, *args, **keys):
    """Set an alarm.

    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()


def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.

    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

y un ejemplo de uso:

import alarm
from time import sleep

try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'
James
fuente
Esto también usa señal, por lo tanto, no funcionará si se llama desde un hilo.
garg10may
0

Aquí hay una ligera mejora en la solución basada en hilos dada.

El siguiente código admite excepciones :

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

Invocándolo con un tiempo de espera de 5 segundos:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)
diemacht
fuente
1
Esto generará una nueva excepción ocultando el rastreo original. Vea mi versión a continuación ...
Meitham
1
Esto tampoco es seguro, como si dentro de runFunctionCatchExceptions()ciertas funciones de Python se llamara obtener GIL. Por ejemplo, lo siguiente sería nunca o por mucho tiempo, de vuelta si llama dentro de la función: eval(2**9999999999**9999999999). Ver stackoverflow.com/questions/22138190/…
Mikko Ohtamaa