¿Llamada de método asincrónico en Python?

178

Me preguntaba si hay alguna biblioteca para llamadas a métodos asincrónicos en Python . Sería genial si pudieras hacer algo como

@async
def longComputation():
    <code>


token = longComputation()
token.registerCallback(callback_function)
# alternative, polling
while not token.finished():
    doSomethingElse()
    if token.finished():
        result = token.result()

O para llamar a una rutina no asíncrona asincrónicamente

def longComputation()
    <code>

token = asynccall(longComputation())

Sería genial tener una estrategia más refinada como nativa en el núcleo del lenguaje. ¿Se consideró esto?

Stefano Borini
fuente
A partir de Python 3.4: docs.python.org/3/library/asyncio.html (hay un backport para 3.3 y brillante nuevo asyncy awaitsintaxis desde 3.5).
jonrsharpe
No existe un mecanismo de devolución de llamada, pero puede agregar resultados en un diccionario y se basa en el módulo de multiprocesamiento de Python. Estoy seguro de que puede agregar un parámetro más a la función decorada como devolución de llamada. github.com/alex-sherman/deco .
RajaRaviVarma
Para empezar. Documentación oficial - docs.python.org/3/library/concurrency.html
Adarsh ​​Madrecha

Respuestas:

141

Puede usar el módulo de multiprocesamiento agregado en Python 2.6. Puede usar grupos de procesos y luego obtener resultados de forma asincrónica con:

apply_async(func[, args[, kwds[, callback]]])

P.ej:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=1)              # Start a worker processes.
    result = pool.apply_async(f, [10], callback) # Evaluate "f(10)" asynchronously calling callback when finished.

Esta es solo una alternativa. Este módulo proporciona muchas facilidades para lograr lo que desea. También será muy fácil hacer un decorador a partir de esto.

Lucas S.
fuente
55
Lucas S., tu ejemplo no funciona, desafortunadamente. La función de devolución de llamada nunca se llama.
DataGreed
66
Probablemente valga la pena tener en cuenta que esto genera procesos separados en lugar de hilos separados dentro de un proceso. Esto podría tener algunas implicaciones.
user47741
11
Esto funciona: resultado = pool.apply_async (f, [10], devolución de llamada = terminar)
MJ
66
Para hacer realmente cualquier cosa de forma asíncrona en Python, se requiere el uso del módulo de multiprocesamiento para generar nuevos procesos. La simple creación de nuevos subprocesos todavía está a merced del Bloqueo global del intérprete que evita que un proceso de Python haga varias cosas a la vez.
Drahkar
2
En caso de que no desee generar un nuevo proceso mientras usa esta solución, cambie la importación a from multiprocessing.dummy import Pool. multiprocessing.dummy tiene exactamente el mismo comportamiento implementado sobre hilos en lugar de procesos
Almog Cohen
203

Algo como:

import threading

thr = threading.Thread(target=foo, args=(), kwargs={})
thr.start() # Will run "foo"
....
thr.is_alive() # Will return whether foo is running currently
....
thr.join() # Will wait till "foo" is done

Consulte la documentación en https://docs.python.org/library/threading.html para obtener más detalles.

Drakosha
fuente
1
Sí, si solo necesitas hacer las cosas de forma asincrónica, ¿por qué no usar hilo? después de todo hilo es de peso ligero de proceso
kk1957
22
Nota importante: la implementación estándar (CPython) de subprocesos no ayudará con las tareas vinculadas a cómputo, debido al "Bloqueo global del intérprete". Ver el documento de la biblioteca: link
solublefish
3
¿Está usando thread.join () realmente asíncrono? ¿Qué sucede si no desea bloquear un subproceso (por ejemplo, un subproceso de interfaz de usuario) y no utilizar una tonelada de recursos haciendo un ciclo while en él?
Mgamerz
1
La unión de @Mgamerz es sincrónica. Puede dejar que el hilo coloque los resultados de la ejecución en alguna cola, y / y llamar a una devolución de llamada. De lo contrario, no sabe cuándo se hace (si es que lo hace).
Drakosha
1
¿Es posible llamar a una función de devolución de llamada al final de la ejecución del hilo, como se puede ver con multiprocessing.Pool
Reda Drissi
49

A partir de Python 3.5, puede usar generadores mejorados para funciones asíncronas.

import asyncio
import datetime

Sintaxis mejorada del generador:

@asyncio.coroutine
def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        yield from asyncio.sleep(1)


loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()

Nueva async/awaitsintaxis:

async def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)


loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()
camabeh
fuente
8
@carnabeh, ¿podría extender ese ejemplo para incluir la función "def longComputation ()" del OP? La mayoría de los ejemplos usan "await asyncio.sleep (1)", pero si el longComputation () regresa, digamos, un doble, no puede simplemente usar "await longComputation ()".
Fabuloso
Diez años en el futuro y esta debería ser la respuesta aceptada ahora. Cuando habla de asíncrono en python3.5 +, lo que viene a la mente debe ser asincio y palabra clave asíncrona.
zeh
31

No está en el núcleo del lenguaje, pero una biblioteca muy madura que hace lo que quieres es Twisted . Introduce el objeto diferido, al que puede adjuntar devoluciones de llamada o controladores de errores ("errbacks"). Un aplazado es básicamente una "promesa" de que una función tendrá un resultado eventualmente.

Meredith L. Patterson
fuente
1
En particular, mire twisted.internet.defer ( twistedmatrix.com/documents/8.2.0/api/… ).
Nicholas Riley
21

Puede implementar un decorador para que sus funciones sean asíncronas, aunque eso es un poco complicado. Sin multiprocessingembargo, el módulo está lleno de pequeños caprichos y restricciones aparentemente arbitrarias, una razón más para encapsularlo detrás de una interfaz amigable.

from inspect import getmodule
from multiprocessing import Pool


def async(decorated):
    r'''Wraps a top-level function around an asynchronous dispatcher.

        when the decorated function is called, a task is submitted to a
        process pool, and a future object is returned, providing access to an
        eventual return value.

        The future object has a blocking get() method to access the task
        result: it will return immediately if the job is already done, or block
        until it completes.

        This decorator won't work on methods, due to limitations in Python's
        pickling machinery (in principle methods could be made pickleable, but
        good luck on that).
    '''
    # Keeps the original function visible from the module global namespace,
    # under a name consistent to its __name__ attribute. This is necessary for
    # the multiprocessing pickling machinery to work properly.
    module = getmodule(decorated)
    decorated.__name__ += '_original'
    setattr(module, decorated.__name__, decorated)

    def send(*args, **opts):
        return async.pool.apply_async(decorated, args, opts)

    return send

El siguiente código ilustra el uso del decorador:

@async
def printsum(uid, values):
    summed = 0
    for value in values:
        summed += value

    print("Worker %i: sum value is %i" % (uid, summed))

    return (uid, summed)


if __name__ == '__main__':
    from random import sample

    # The process pool must be created inside __main__.
    async.pool = Pool(4)

    p = range(0, 1000)
    results = []
    for i in range(4):
        result = printsum(i, sample(p, 100))
        results.append(result)

    for result in results:
        print("Worker %i: sum value is %i" % result.get())

En un caso del mundo real, trabajaría un poco más en el decorador, proporcionando alguna forma de desactivarlo para la depuración (manteniendo la interfaz futura en su lugar), o tal vez una instalación para manejar excepciones; pero creo que esto demuestra el principio lo suficientemente bien.

xperroni
fuente
Esta debería ser la mejor respuesta. Me encanta cómo puede devolver valor. No como el hilo que simplemente se ejecuta de forma asincrónica.
Aminah Nuraini
16

Sólo

import threading, time

def f():
    print "f started"
    time.sleep(3)
    print "f finished"

threading.Thread(target=f).start()
Antigluk
fuente
8

Podrías usar eventlet. Le permite escribir lo que parece ser un código síncrono, pero que funcione de manera asíncrona a través de la red.

Aquí hay un ejemplo de un rastreador súper mínimo:

urls = ["http://www.google.com/intl/en_ALL/images/logo.gif",
     "https://wiki.secondlife.com/w/images/secondlife.jpg",
     "http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"]

import eventlet
from eventlet.green import urllib2

def fetch(url):

  return urllib2.urlopen(url).read()

pool = eventlet.GreenPool()

for body in pool.imap(fetch, urls):
  print "got body", len(body)
Raj
fuente
7

Mi solución es:

import threading

class TimeoutError(RuntimeError):
    pass

class AsyncCall(object):
    def __init__(self, fnc, callback = None):
        self.Callable = fnc
        self.Callback = callback

    def __call__(self, *args, **kwargs):
        self.Thread = threading.Thread(target = self.run, name = self.Callable.__name__, args = args, kwargs = kwargs)
        self.Thread.start()
        return self

    def wait(self, timeout = None):
        self.Thread.join(timeout)
        if self.Thread.isAlive():
            raise TimeoutError()
        else:
            return self.Result

    def run(self, *args, **kwargs):
        self.Result = self.Callable(*args, **kwargs)
        if self.Callback:
            self.Callback(self.Result)

class AsyncMethod(object):
    def __init__(self, fnc, callback=None):
        self.Callable = fnc
        self.Callback = callback

    def __call__(self, *args, **kwargs):
        return AsyncCall(self.Callable, self.Callback)(*args, **kwargs)

def Async(fnc = None, callback = None):
    if fnc == None:
        def AddAsyncCallback(fnc):
            return AsyncMethod(fnc, callback)
        return AddAsyncCallback
    else:
        return AsyncMethod(fnc, callback)

Y funciona exactamente como se solicitó:

@Async
def fnc():
    pass
Nevyn
fuente
5

Algo como esto funciona para mí, puede llamar a la función y se enviará a un nuevo hilo.

from thread import start_new_thread

def dowork(asynchronous=True):
    if asynchronous:
        args = (False)
        start_new_thread(dowork,args) #Call itself on a new thread.
    else:
        while True:
            #do something...
            time.sleep(60) #sleep for a minute
    return
Nicholas Hamilton
fuente
2

¿Hay alguna razón para no usar hilos? Puedes usar la threadingclase. En lugar de la finished()función, use el isAlive(). La result()función podría join()el hilo y recuperar el resultado. Y, si puede, anule las funciones run()y __init__para llamar a la función especificada en el constructor y guarde el valor en algún lugar en la instancia de la clase.

ondra
fuente
2
Si se trata de una función computacionalmente costosa, el subproceso no te dará nada (probablemente hará las cosas más lentas en realidad) ya que un proceso de Python se limita a un núcleo de CPU debido a la GIL.
Kurt
2
@Kurt, si bien es cierto, el OP no mencionó que el rendimiento era su preocupación. Hay otras razones para querer un comportamiento asincrónico ...
Peter Hansen
Los subprocesos en python no son excelentes cuando desea tener la opción de eliminar la llamada al método asincrónico, ya que solo el subproceso principal en python recibe señales.
CivFan
2

Puede usar concurrent.futures (agregado en Python 3.2).

import time
from concurrent.futures import ThreadPoolExecutor


def long_computation(duration):
    for x in range(0, duration):
        print(x)
        time.sleep(1)
    return duration * 2


print('Use polling')
with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(long_computation, 5)
    while not future.done():
        print('waiting...')
        time.sleep(0.5)

    print(future.result())

print('Use callback')
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(long_computation, 5)
future.add_done_callback(lambda f: print(f.result()))

print('waiting for callback')

executor.shutdown(False)  # non-blocking

print('shutdown invoked')
Calabaza grande
fuente
Esta es una gran respuesta, ya que es la única aquí que ofrece la posibilidad de un conjunto de subprocesos con devoluciones de llamada
Reda Drissi
Desafortunadamente, esto también sufre del "Bloqueo Global de Intérpretes". Ver la biblioteca doc: enlace . Probado con Python 3.7
Alex
0

Puedes usar el proceso. Si desea ejecutarlo para siempre, use while (como redes) en su función:

from multiprocessing import Process
def foo():
    while 1:
        # Do something

p = Process(target = foo)
p.start()

si solo quieres ejecutarlo una vez, hazlo así:

from multiprocessing import Process
def foo():
    # Do something

p = Process(target = foo)
p.start()
p.join()
Keivan
fuente