¿Cómo funciona realmente asyncio?

119

Esta pregunta está motivada por mi otra pregunta: ¿Cómo esperar en cdef?

Hay toneladas de artículos y publicaciones de blogs en la web asyncio, pero todos son muy superficiales. No pude encontrar ninguna información sobre cómo asynciose implementa realmente y qué hace que la E / S sea asincrónica. Estaba tratando de leer el código fuente, pero son miles de líneas que no son del código C de mayor grado, muchas de las cuales tratan con objetos auxiliares, pero lo más importante es que es difícil conectarse entre la sintaxis de Python y el código C que traduciría. dentro.

La propia documentación de Asycnio es aún menos útil. No hay información sobre cómo funciona, solo algunas pautas sobre cómo usarlo, que a veces también son engañosas / están muy mal escritas.

Estoy familiarizado con la implementación de corrutinas de Go, y esperaba que Python hiciera lo mismo. Si ese fuera el caso, el código que aparecí en la publicación vinculada anteriormente habría funcionado. Como no fue así, ahora estoy tratando de averiguar por qué. Mi mejor suposición hasta ahora es la siguiente, corríjame donde me equivoque:

  1. Las definiciones de procedimiento del formulario en async def foo(): ...realidad se interpretan como métodos de una clase heredada coroutine.
  2. Quizás, en async defrealidad , se divide en varios métodos mediante awaitdeclaraciones, donde el objeto, en el que se llaman estos métodos, puede realizar un seguimiento del progreso realizado hasta el momento en la ejecución.
  3. Si lo anterior es cierto, entonces, esencialmente, la ejecución de una corrutina se reduce a llamar a los métodos de un objeto de rutina por algún administrador global (¿bucle?).
  4. El administrador global es de alguna manera (¿cómo?) Consciente de cuándo las operaciones de E / S son realizadas por el código Python (¿solo?) Y puede elegir uno de los métodos de corutina pendientes para ejecutar después de que el método de ejecución actual haya renunciado al control (presione en la awaitdeclaración ).

En otras palabras, aquí está mi intento de "desugaring" de alguna asynciosintaxis en algo más comprensible:

async def coro(name):
    print('before', name)
    await asyncio.sleep()
    print('after', name)

asyncio.gather(coro('first'), coro('second'))

# translated from async def coro(name)
class Coro(coroutine):
    def before(self, name):
        print('before', name)

    def after(self, name):
        print('after', name)

    def __init__(self, name):
        self.name = name
        self.parts = self.before, self.after
        self.pos = 0

    def __call__():
        self.parts[self.pos](self.name)
        self.pos += 1

    def done(self):
        return self.pos == len(self.parts)


# translated from asyncio.gather()
class AsyncIOManager:

    def gather(*coros):
        while not every(c.done() for c in coros):
            coro = random.choice(coros)
            coro()

Si mi conjetura resulta correcta: entonces tengo un problema. ¿Cómo ocurre realmente la E / S en este escenario? ¿En un hilo separado? ¿Está suspendido todo el intérprete y la E / S ocurre fuera del intérprete? ¿Qué se entiende exactamente por E / S? Si mi procedimiento de Python llamó al procedimiento C open(), y a su vez envió una interrupción al kernel, cediéndole el control, ¿cómo sabe el intérprete de Python sobre esto y puede continuar ejecutando algún otro código, mientras que el código del kernel hace la E / S real y hasta ¿Despierta el procedimiento de Python que envió la interrupción originalmente? ¿Cómo puede el intérprete de Python, en principio, darse cuenta de que esto está sucediendo?

wvxvw
fuente
2
La mayor parte de la lógica la maneja la implementación del bucle de eventos. Mire cómo BaseEventLoopse implementa CPython : github.com/python/cpython/blob/…
Blender
@Blender ok, creo que finalmente encontré lo que quería, pero ahora no entiendo la razón por la que el código fue escrito como estaba. ¿Por qué se _run_oncehace "privada", que es en realidad la única función útil en todo este módulo? La implementación es horrible, pero eso es un problema menor. ¿Por qué la única función a la que querría llamar en un bucle de eventos está marcada como "no me llames"?
wvxvw
Esa es una pregunta para la lista de correo. ¿Qué caso de uso requeriría que tocara _run_onceen primer lugar?
Blender
8
Sin embargo, eso no responde realmente a mi pregunta. ¿Cómo resolvería cualquier problema útil usando solo _run_once? asyncioes complejo y tiene sus fallas, pero por favor mantenga la discusión civilizada. No hable mal de los desarrolladores detrás del código que usted mismo no comprende.
Blender
1
@ user8371915 Si cree que hay algo que no cubrí, puede agregar o comentar mi respuesta.
Bharel

Respuestas:

202

¿Cómo actúa asyncio?

Antes de responder a esta pregunta, debemos comprender algunos términos básicos, omítelos si ya conoce alguno de ellos.

Generadores

Los generadores son objetos que nos permiten suspender la ejecución de una función de Python. Los generadores seleccionados por el usuario se implementan utilizando la palabra clave yield. Al crear una función normal que contiene la yieldpalabra clave, convertimos esa función en un generador:

>>> def test():
...     yield 1
...     yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

Como puede ver, llamar next()al generador hace que el intérprete cargue el marco de prueba y devuelva el yieldvalor ed. Llamar de next()nuevo, hace que el marco se cargue de nuevo en la pila de intérpretes y continúe yieldcon otro valor.

A la tercera vez que next()se llama, nuestro generador se terminó y StopIterationse lanzó.

Comunicarse con un generador

Una característica menos conocida de los generadores es el hecho de que puede comunicarse con ellos mediante dos métodos: send()y throw().

>>> def test():
...     val = yield 1
...     print(val)
...     yield 2
...     yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in test
Exception

Al llamar gen.send(), el valor se pasa como un valor de retorno delyield palabra clave.

gen.throw() por otro lado, permite lanzar Excepciones dentro de los generadores, con la excepción levantada en el mismo lugar yield se llamó.

Devolución de valores de generadores

Al devolver un valor de un generador, el valor se coloca dentro de la StopIterationexcepción. Más adelante podemos recuperar el valor de la excepción y usarlo según nuestras necesidades.

>>> def test():
...     yield 1
...     return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
...     next(gen)
... except StopIteration as exc:
...     print(exc.value)
...
abc

He aquí una nueva palabra clave: yield from

Python 3.4 vino con la adición de una nueva palabra clave: yield from. ¿Qué palabra clave que nos permite hacer, es pasar en cualquier next(), send()y throw()en un generador de más interno anidado. Si el generador interno devuelve un valor, también es el valor de retorno de yield from:

>>> def inner():
...     inner_result = yield 2
...     print('inner', inner_result)
...     return 3
...
>>> def outer():
...     yield 1
...     val = yield from inner()
...     print('outer', val)
...     yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen) # Goes inside inner() automatically
2
>>> gen.send("abc")
inner abc
outer 3
4

He escrito un articulo para profundizar en este tema.

Poniendolo todo junto

Al introducir la nueva palabra clave yield fromen Python 3.4, ahora pudimos crear generadores dentro de generadores que, al igual que un túnel, pasan los datos de un lado a otro de los generadores más internos a los más externos. Esto ha dado lugar a un nuevo significado para los generadores: corrutinas .

Las corrutinas son funciones que se pueden detener y reanudar mientras se ejecutan. En Python, se definen mediante la async defpalabra clave. Al igual que los generadores, ellos también usan su propia forma de yield fromlo que es await. Antes asyncy awaitse introdujeron en Python 3.5, creamos corrutinas exactamente de la misma manera que se crearon los generadores (con en yield fromlugar de await).

async def inner():
    return 1

async def outer():
    await inner()

Como todos los iteradores o generadores que implementan el __iter__()método, se implementan corrutinas __await__()que les permiten continuar cada vezawait coro se llama.

Hay un bonito diagrama de secuencia dentro del documentos de Python que debe consultar.

En asyncio, además de las funciones de rutina, tenemos 2 objetos importantes: tareas y futuros .

Futuros

Los futuros son objetos que tienen el __await__()método implementado y su trabajo es mantener un cierto estado y resultado. El estado puede ser uno de los siguientes:

  1. PENDIENTE: el futuro no tiene ningún resultado o conjunto de excepciones.
  2. CANCELADO: el futuro se canceló usando fut.cancel()
  3. FINISHED: el futuro se terminó, ya sea por un conjunto de resultados usando fut.set_result()o por un conjunto de excepciones usandofut.set_exception()

El resultado, tal como lo ha adivinado, puede ser un objeto Python, que se devolverá, o una excepción que se puede generar.

Otra característica importante de los futureobjetos es que contienen un método llamadoadd_done_callback() . Este método permite llamar a las funciones tan pronto como se realiza la tarea, ya sea que haya generado una excepción o haya terminado.

Tareas

Los objetos de tarea son futuros especiales, que envuelven corrutinas y se comunican con las corrutinas más internas y más externas. Cada vez que una corrutina awaites un futuro, el futuro se devuelve a la tarea (como enyield from ) y la tarea lo recibe.

A continuación, la tarea se une al futuro. Lo hace llamandoadd_done_callback() el futuro. A partir de ahora, si el futuro se realiza alguna vez, ya sea cancelando, pasando una excepción o pasando un objeto Python como resultado, se llamará a la devolución de llamada de la tarea y volverá a existir.

Asyncio

La última pregunta candente que debemos responder es: ¿cómo se implementa el IO?

En el fondo de asyncio, tenemos un bucle de eventos. Un ciclo de eventos de tareas. El trabajo del bucle de eventos es llamar a las tareas cada vez que están listas y coordinar todo ese esfuerzo en una sola máquina de trabajo.

La parte IO del bucle de eventos se basa en una única función crucial llamada select. Select es una función de bloqueo, implementada por el sistema operativo debajo, que permite esperar en los sockets para los datos entrantes o salientes. Una vez que se reciben los datos, se activa y devuelve los sockets que recibieron datos o los sockets que están listos para escribir.

Cuando intenta recibir o enviar datos a través de un socket a través de asyncio, lo que realmente sucede a continuación es que el socket se verifica primero si tiene algún dato que pueda leerse o enviarse inmediatamente. Si su .send()búfer está lleno, o el .recv()búfer está vacío, el conector se registra en la selectfunción (simplemente agregándolo a una de las listas, rlistpara recvy wlistpara send) y la función apropiada es awaitun futureobjeto recién creado , vinculado a ese conector.

Cuando todas las tareas disponibles están esperando futuros, el ciclo de eventos llama selecty espera. Cuando uno de los sockets tiene datos entrantes, o su sendbúfer se agota, asyncio busca el objeto futuro vinculado a ese socket y lo configura como hecho.

Ahora ocurre toda la magia. El futuro está listo para terminar, la tarea que se agregó antes con add_done_callback()vuelve a la vida y llama .send()a la corrutina que reanuda la corrutina más interna (debido a la awaitcadena) y usted lee los datos recién recibidos de un búfer cercano. se derramó sobre.

Cadena de métodos nuevamente, en caso de recv():

  1. select.select murga.
  2. Se devuelve un socket listo, con datos.
  3. Los datos del socket se mueven a un búfer.
  4. future.set_result() se llama.
  5. La tarea que se agregó a sí misma add_done_callback()ahora está despertada.
  6. La tarea llama .send()a la corrutina que llega hasta la corrutina más interna y la activa.
  7. Los datos se leen del búfer y se devuelven a nuestro humilde usuario.

En resumen, asyncio usa capacidades de generador, que permiten pausar y reanudar funciones. Utiliza yield fromcapacidades que permiten pasar datos de ida y vuelta desde el generador más interno al más externo. Utiliza todos esos para detener la ejecución de la función mientras espera que IO se complete (mediante el uso de la selectfunción del sistema operativo).

¿Y lo mejor de todo? Mientras una función está en pausa, otra puede ejecutarse y entrelazarse con la delicada tela, que es asyncio.

Bharel
fuente
12
Si necesita más explicación, no dude en comentar. Por cierto, no estoy del todo seguro de si debería haber escrito esto como un artículo de blog o una respuesta en stackoverflow. La pregunta es larga de responder.
Bharel
1
En un socket asincrónico, al intentar enviar o recibir datos, primero se comprueba el búfer del sistema operativo. Si está intentando recibir y no hay datos en el búfer, la función de recepción subyacente devolverá un valor de error que se propagará como una excepción en Python. Lo mismo con el envío y un búfer completo. Cuando se genera la excepción, Python a su vez envía esos sockets a la función de selección que suspende el proceso. Pero no es así como funciona asyncio, sino cómo funcionan los sockets y la selección, lo que también es muy específico del sistema operativo.
Bharel
2
@ user8371915 Siempre aquí para ayudar :-) Tenga en cuenta que para comprender Asyncio debe saber cómo funcionan los generadores, la comunicación y el yield fromfuncionamiento del generador . Sin embargo, noté arriba que se puede omitir en caso de que el lector ya lo sepa :-) ¿Algo más que crea que debería agregar?
Bharel
2
Las cosas antes de la sección de Asyncio son quizás las más críticas, ya que son lo único que el lenguaje realmente hace por sí mismo. El también selectpuede calificar, ya que así es como funcionan las llamadas al sistema de E / S sin bloqueo en el sistema operativo. Las asyncioconstrucciones reales y el bucle de eventos son solo código de nivel de aplicación creado a partir de estas cosas.
MisterMiyagi
3
Esta publicación tiene información de la columna vertebral de E / S asíncrona en Python. Gracias por tan amable explicación.
mjkim
83

Hablar de async/awaity asynciono es lo mismo. La primera es una construcción fundamental de bajo nivel (corrutinas), mientras que la última es una biblioteca que utiliza estas construcciones. Por el contrario, no existe una única respuesta definitiva.

La siguiente es una descripción general de cómo funcionan las bibliotecas async/awaity asynciosimilares. Es decir, puede haber otros trucos en la parte superior (hay ...) pero son intrascendentes a menos que los construya usted mismo. La diferencia debería ser insignificante a menos que ya sepa lo suficiente como para no tener que hacer esa pregunta.

1. Corutinas versus subrutinas en pocas palabras

Al igual que las subrutinas (funciones, procedimientos, ...), las corrutinas (generadores, ...) son una abstracción de la pila de llamadas y el puntero de instrucción: hay una pila de piezas de código en ejecución, y cada una está en una instrucción específica.

La distinción de defversus async defes simplemente para mayor claridad. La diferencia real es returnversus yield. A partir de esto, awaito yield fromtome la diferencia de llamadas individuales a pilas completas.

1.1. Subrutinas

Una subrutina representa un nuevo nivel de pila para contener variables locales y un solo recorrido de sus instrucciones para llegar a un final. Considere una subrutina como esta:

def subfoo(bar):
     qux = 3
     return qux * bar

Cuando lo ejecutas, eso significa

  1. asignar espacio de pila para baryqux
  2. ejecutar recursivamente la primera declaración y saltar a la siguiente declaración
  3. una vez a la vez return, empuja su valor a la pila de llamadas
  4. borre la pila (1.) y el puntero de instrucción (2.)

En particular, 4. significa que una subrutina siempre comienza en el mismo estado. Todo lo exclusivo de la función en sí se pierde al finalizar. No se puede reanudar una función, incluso si hay instrucciones después return.

root -\
  :    \- subfoo --\
  :/--<---return --/
  |
  V

1.2. Corutinas como subrutinas persistentes

Una corrutina es como una subrutina, pero puede salir sin destruir su estado. Considere una corrutina como esta:

 def cofoo(bar):
      qux = yield bar  # yield marks a break point
      return qux

Cuando lo ejecutas, eso significa

  1. asignar espacio de pila para baryqux
  2. ejecutar recursivamente la primera declaración y saltar a la siguiente declaración
    1. una vez a la vez yield, empuja su valor a la pila de llamadas pero almacena la pila y el puntero de instrucción
    2. una vez llamando yield, restaure la pila y el puntero de instrucción y envíe argumentos aqux
  3. una vez a la vez return, empuja su valor a la pila de llamadas
  4. borre la pila (1.) y el puntero de instrucción (2.)

Tenga en cuenta la adición de 2.1 y 2.2: una corrutina se puede suspender y reanudar en puntos predefinidos. Esto es similar a cómo se suspende una subrutina durante la llamada a otra subrutina. La diferencia es que la corrutina activa no está estrictamente ligada a su pila de llamadas. En cambio, una corrutina suspendida es parte de una pila separada y aislada.

root -\
  :    \- cofoo --\
  :/--<+--yield --/
  |    :
  V    :

Esto significa que las corrutinas suspendidas se pueden almacenar o mover libremente entre pilas. Cualquier pila de llamadas que tenga acceso a una corrutina puede decidir reanudarla.

1.3. Atravesando la pila de llamadas

Hasta ahora, nuestra corrutina solo baja en la pila de llamadas con yield. Una subrutina puede subir y bajar en la pila de llamadas con returny (). Para que estén completas, las corrutinas también necesitan un mecanismo para subir la pila de llamadas. Considere una corrutina como esta:

def wrap():
    yield 'before'
    yield from cofoo()
    yield 'after'

Cuando lo ejecuta, eso significa que todavía asigna la pila y el puntero de instrucción como una subrutina. Cuando se suspende, sigue siendo como almacenar una subrutina.

Sin embargo, yield fromhace ambas cosas . Suspende la pila y el puntero de instrucción wrap y se ejecuta cofoo. Tenga en cuenta que wrappermanece suspendido hasta que cofootermina por completo. Siempre que se cofoosuspende o se envía algo, cofoose conecta directamente a la pila de llamadas.

1.4. Coroutines hasta el final

Según lo establecido, yield frompermite conectar dos visores a través de otro intermedio. Cuando se aplica de forma recursiva, eso significa que la parte superior de la pila se puede conectar a la parte inferior de la pila.

root -\
  :    \-> coro_a -yield-from-> coro_b --\
  :/ <-+------------------------yield ---/
  |    :
  :\ --+-- coro_a.send----------yield ---\
  :                             coro_b <-/

Tenga en cuenta que rooty coro_bno se conocen el uno al otro. Esto hace que las corrutinas sean mucho más limpias que las devoluciones de llamada: las corrutinas aún se construyen en una relación 1: 1 como las subrutinas. Las corrutinas suspenden y reanudan toda su pila de ejecución existente hasta un punto de llamada regular.

En particular, rootpodría tener un número arbitrario de corrutinas para reanudar. Sin embargo, nunca puede reanudar más de uno al mismo tiempo. Las corrutinas de la misma raíz son concurrentes pero no paralelas.

1.5. Python asyncyawait

Hasta ahora, la explicación ha utilizado explícitamente el vocabulario yieldy yield fromde los generadores: la funcionalidad subyacente es la misma. La nueva sintaxis de Python3.5 asyncy awaitexiste principalmente para mayor claridad.

def foo():  # subroutine?
     return None

def foo():  # coroutine?
     yield from foofoo()  # generator? coroutine?

async def foo():  # coroutine!
     await foofoo()  # coroutine!
     return None

Las declaraciones async fory async withson necesarias porque rompería la yield from/awaitcadena con las declaraciones desnudas fory with.

2. Anatomía de un bucle de eventos simple

Por sí misma, una corrutina no tiene el concepto de ceder el control a otra corrutina. Solo puede ceder el control a la persona que llama en la parte inferior de una pila de corrutinas. Esta persona que llama puede cambiar a otra corrutina y ejecutarla.

Este nodo raíz de varias corrutinas es comúnmente un bucle de eventos : en suspensión, una corrutina produce un evento en el que desea reanudar. A su vez, el bucle de eventos es capaz de esperar eficientemente a que ocurran estos eventos. Esto le permite decidir qué corrutina ejecutar a continuación o cómo esperar antes de reanudar.

Tal diseño implica que existe un conjunto de eventos predefinidos que el bucle comprende. Varias corrutinas awaitentre sí, hasta que finalmente se edita un evento await. Este evento puede comunicarse directamente con el bucle de eventos mediante el yieldcontrol.

loop -\
  :    \-> coroutine --await--> event --\
  :/ <-+----------------------- yield --/
  |    :
  |    :  # loop waits for event to happen
  |    :
  :\ --+-- send(reply) -------- yield --\
  :        coroutine <--yield-- event <-/

La clave es que la suspensión de rutina permite que el bucle de eventos y los eventos se comuniquen directamente. La pila de corrutinas intermedia no requiere ningún conocimiento sobre qué bucle lo está ejecutando, ni cómo funcionan los eventos.

2.1.1. Eventos en el tiempo

El evento más simple de manejar es llegar a un punto en el tiempo. Este es un bloque fundamental de código enhebrado también: un subproceso se repite repetidamente sleephasta que una condición es verdadera. Sin embargo, una sleepejecución de bloques regular por sí sola: queremos que no se bloqueen otras corrutinas. En su lugar, queremos decirle al bucle de eventos cuándo debe reanudar la pila de corrutinas actual.

2.1.2. Definición de un evento

Un evento es simplemente un valor que podemos identificar, ya sea a través de una enumeración, un tipo u otra identidad. Podemos definir esto con una clase simple que almacena nuestro tiempo objetivo. Además de almacenar la información del evento, podemos permitir awaituna clase directamente.

class AsyncSleep:
    """Event to sleep until a point in time"""
    def __init__(self, until: float):
        self.until = until

    # used whenever someone ``await``s an instance of this Event
    def __await__(self):
        # yield this Event to the loop
        yield self

    def __repr__(self):
        return '%s(until=%.1f)' % (self.__class__.__name__, self.until)

Esta clase solo almacena el evento, no dice cómo manejarlo realmente.

La única característica especial es __await__: es lo que awaitbusca la palabra clave. Prácticamente, es un iterador pero no está disponible para la maquinaria de iteración regular.

2.2.1. Esperando un evento

Ahora que tenemos un evento, ¿cómo reaccionan las corrutinas? Debemos ser capaces de expresar el equivalente de sleeppor awaiting nuestro evento. Para ver mejor lo que está pasando, esperamos dos veces la mitad del tiempo:

import time

async def asleep(duration: float):
    """await that ``duration`` seconds pass"""
    await AsyncSleep(time.time() + duration / 2)
    await AsyncSleep(time.time() + duration / 2)

Podemos instanciar y ejecutar directamente esta corrutina. Similar a un generador, el uso coroutine.sendejecuta la corrutina hasta obtener yieldun resultado.

coroutine = asleep(100)
while True:
    print(coroutine.send(None))
    time.sleep(0.1)

Esto nos da dos AsyncSleepeventos y luego una StopIterationcuando se realiza la corrutina. ¡Tenga en cuenta que el único retraso es time.sleepel del bucle! Cada uno AsyncSleepsolo almacena un desplazamiento de la hora actual.

2.2.2. Evento + Sueño

En este punto, tenemos dos mecanismos separados a nuestra disposición:

  • AsyncSleep Eventos que se pueden generar desde el interior de una corrutina
  • time.sleep que puede esperar sin afectar las rutinas

En particular, estos dos son ortogonales: ninguno afecta ni desencadena al otro. Como resultado, podemos idear nuestra propia estrategia sleeppara afrontar el retraso de un AsyncSleep.

2.3. Un bucle de eventos ingenuo

Si disponemos de varias corrutinas, cada una puede indicarnos cuándo quiere que le despierten. Luego podemos esperar hasta que el primero de ellos quiera reanudarse, luego el siguiente, y así sucesivamente. En particular, en cada punto solo nos preocupamos por cuál es el siguiente .

Esto hace que la programación sea sencilla:

  1. ordenar las rutinas por la hora deseada para despertarse
  2. escoge el primero que quiera despertar
  3. espera hasta este momento
  4. ejecutar esta corrutina
  5. repetir desde 1.

Una implementación trivial no necesita conceptos avanzados. A listpermite ordenar las corrutinas por fecha. Esperar es algo habitual time.sleep. La ejecución de corrutinas funciona igual que antes coroutine.send.

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    # store wake-up-time and coroutines
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting:
        # 2. pick the first coroutine that wants to wake up
        until, coroutine = waiting.pop(0)
        # 3. wait until this point in time
        time.sleep(max(0.0, until - time.time()))
        # 4. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])

Por supuesto, esto tiene un amplio margen de mejora. Podemos usar un montón para la cola de espera o una tabla de despacho para eventos. También podríamos obtener valores de retorno de StopIterationy asignarlos a la corrutina. Sin embargo, el principio fundamental sigue siendo el mismo.

2.4. Espera cooperativa

El AsyncSleepevento y el runciclo de eventos son una implementación totalmente funcional de eventos cronometrados.

async def sleepy(identifier: str = "coroutine", count=5):
    for i in range(count):
        print(identifier, 'step', i + 1, 'at %.2f' % time.time())
        await asleep(0.1)

run(*(sleepy("coroutine %d" % j) for j in range(5)))

Esto cambia cooperativamente entre cada una de las cinco corrutinas, suspendiendo cada una durante 0,1 segundos. Aunque el ciclo de eventos es síncrono, aún ejecuta el trabajo en 0,5 segundos en lugar de 2,5 segundos. Cada corrutina mantiene el estado y actúa de forma independiente.

3. Bucle de eventos de E / S

Un bucle de eventos que admita sleepes adecuado para el sondeo . Sin embargo, la espera de E / S en un identificador de archivo se puede hacer de manera más eficiente: el sistema operativo implementa E / S y, por lo tanto, sabe qué identificadores están listos. Idealmente, un bucle de eventos debería admitir un evento explícito "listo para E / S".

3.1. La selectllamada

Python ya tiene una interfaz para consultar el sistema operativo para leer identificadores de E / S. Cuando se llama con identificadores para leer o escribir, devuelve los identificadores listos para leer o escribir:

readable, writeable, _ = select.select(rlist, wlist, xlist, timeout)

Por ejemplo, podemos openescribir un archivo y esperar a que esté listo:

write_target = open('/tmp/foo')
readable, writeable, _ = select.select([], [write_target], [])

Una vez que seleccione las devoluciones, writeablecontiene nuestro archivo abierto.

3.2. Evento de E / S básico

Similar a la AsyncSleepsolicitud, necesitamos definir un evento para E / S. Con la selectlógica subyacente , el evento debe referirse a un objeto legible, digamos un openarchivo. Además, almacenamos cuántos datos leer.

class AsyncRead:
    def __init__(self, file, amount=1):
        self.file = file
        self.amount = amount
        self._buffer = ''

    def __await__(self):
        while len(self._buffer) < self.amount:
            yield self
            # we only get here if ``read`` should not block
            self._buffer += self.file.read(1)
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.file, self.amount, len(self._buffer)
        )

Al igual que con la AsyncSleepmayoría de las veces, solo almacenamos los datos necesarios para la llamada al sistema subyacente. Esta vez, __await__se puede reanudar varias veces, hasta que amountse haya leído lo deseado . Además, obtenemos returnel resultado de E / S en lugar de simplemente reanudarlo.

3.3. Aumento de un bucle de eventos con lectura de E / S

La base de nuestro bucle de eventos sigue siendo la rundefinida anteriormente. Primero, necesitamos rastrear las solicitudes de lectura. Este ya no es un horario ordenado, solo asignamos solicitudes de lectura a corrutinas.

# new
waiting_read = {}  # type: Dict[file, coroutine]

Dado que select.selecttoma un parámetro de tiempo de espera, podemos usarlo en lugar de time.sleep.

# old
time.sleep(max(0.0, until - time.time()))
# new
readable, _, _ = select.select(list(reads), [], [])

Esto nos da todos los archivos legibles; si hay alguno, ejecutamos la corrutina correspondiente. Si no hay ninguno, hemos esperado lo suficiente para que se ejecute nuestra corrutina actual.

# new - reschedule waiting coroutine, run readable coroutine
if readable:
    waiting.append((until, coroutine))
    waiting.sort()
    coroutine = waiting_read[readable[0]]

Finalmente, tenemos que escuchar las solicitudes de lectura.

# new
if isinstance(command, AsyncSleep):
    ...
elif isinstance(command, AsyncRead):
    ...

3.4. Poniendo todo junto

Lo anterior fue un poco simplificado. Necesitamos hacer algunos cambios para no morir de hambre a las corrutinas para dormir si siempre podemos leer. Necesitamos manejar no tener nada que leer o nada que esperar. Sin embargo, el resultado final todavía encaja en 30 LOC.

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    waiting_read = {}  # type: Dict[file, coroutine]
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting or waiting_read:
        # 2. wait until the next coroutine may run or read ...
        try:
            until, coroutine = waiting.pop(0)
        except IndexError:
            until, coroutine = float('inf'), None
            readable, _, _ = select.select(list(waiting_read), [], [])
        else:
            readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time()))
        # ... and select the appropriate one
        if readable and time.time() < until:
            if until and coroutine:
                waiting.append((until, coroutine))
                waiting.sort()
            coroutine = waiting_read.pop(readable[0])
        # 3. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension ...
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])
        # ... or register reads
        elif isinstance(command, AsyncRead):
            waiting_read[command.file] = coroutine

3.5. E / S cooperativa

Las implementaciones AsyncSleep, AsyncReady runahora son completamente funcionales para dormir y / o leer. Igual que para sleepy, podemos definir un ayudante para probar la lectura:

async def ready(path, amount=1024*32):
    print('read', path, 'at', '%d' % time.time())
    with open(path, 'rb') as file:
        result = return await AsyncRead(file, amount)
    print('done', path, 'at', '%d' % time.time())
    print('got', len(result), 'B')

run(sleepy('background', 5), ready('/dev/urandom'))

Al ejecutar esto, podemos ver que nuestra E / S está intercalada con la tarea en espera:

id background round 1
read /dev/urandom at 1530721148
id background round 2
id background round 3
id background round 4
id background round 5
done /dev/urandom at 1530721148
got 1024 B

4. E / S sin bloqueo

Si bien la E / S en archivos transmite el concepto, no es realmente adecuado para una biblioteca como asyncio: la selectllamada siempre regresa para los archivos , y ambos openy readpueden bloquearse indefinidamente . Esto bloquea todas las corrutinas de un bucle de eventos, lo cual es malo. Bibliotecas comoaiofiles utilizan subprocesos y sincronización para falsificar eventos y E / S no bloqueantes en el archivo.

Sin embargo, los sockets permiten E / S sin bloqueo, y su latencia inherente lo hace mucho más crítico. Cuando se usa en un bucle de eventos, la espera de datos y el reintento se pueden ajustar sin bloquear nada.

4.1. Evento de E / S sin bloqueo

Similar a nuestro AsyncRead, podemos definir un evento de suspensión y lectura para sockets. En lugar de tomar un archivo, tomamos un socket, que debe ser sin bloqueo. Además, nuestros __await__usos en socket.recvlugar de file.read.

class AsyncRecv:
    def __init__(self, connection, amount=1, read_buffer=1024):
        assert not connection.getblocking(), 'connection must be non-blocking for async recv'
        self.connection = connection
        self.amount = amount
        self.read_buffer = read_buffer
        self._buffer = b''

    def __await__(self):
        while len(self._buffer) < self.amount:
            try:
                self._buffer += self.connection.recv(self.read_buffer)
            except BlockingIOError:
                yield self
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.connection, self.amount, len(self._buffer)
        )

A diferencia de AsyncRead, __await__realiza E / S verdaderamente sin bloqueo. Cuando hay datos disponibles, siempre se lee. Cuando no hay datos disponibles, siempre se suspende. Eso significa que el bucle de eventos solo se bloquea mientras realizamos un trabajo útil.

4.2. Desbloquear el bucle de eventos

En lo que respecta al bucle de eventos, nada cambia mucho. El evento para escuchar sigue siendo el mismo que para los archivos: un descriptor de archivo marcado como listo por select.

# old
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
# new
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
elif isinstance(command, AsyncRecv):
    waiting_read[command.connection] = coroutine

En este punto, debería ser obvio que AsyncReady AsyncRecvson el mismo tipo de evento. Podríamos refactorizarlos fácilmente para que sean un evento con un componente de E / S intercambiable. En efecto, el ciclo de eventos, las corrutinas y los eventos separan claramente un programador, un código intermedio arbitrario y la E / S real.

4.3. El lado feo de la E / S sin bloqueo

En principio, lo que deberías hacer en este punto es replicar la lógica de readas a recvfor AsyncRecv. Sin embargo, esto es mucho más feo ahora: tienes que manejar los retornos tempranos cuando las funciones se bloquean dentro del kernel, pero te dan el control. Por ejemplo, abrir una conexión en lugar de abrir un archivo es mucho más largo:

# file
file = open(path, 'rb')
# non-blocking socket
connection = socket.socket()
connection.setblocking(False)
# open without blocking - retry on failure
try:
    connection.connect((url, port))
except BlockingIOError:
    pass

En pocas palabras, lo que queda son unas pocas docenas de líneas de manejo de excepciones. Los eventos y el ciclo de eventos ya funcionan en este punto.

id background round 1
read localhost:25000 at 1530783569
read /dev/urandom at 1530783569
done localhost:25000 at 1530783569 got 32768 B
id background round 2
id background round 3
id background round 4
done /dev/urandom at 1530783569 got 4096 B
id background round 5

Apéndice

Código de ejemplo en github

MisterMiyagi
fuente
Usar yield selfen AsyncSleep me da un Task got back yielderror, ¿por qué? Veo que el código en asyncio.Futures usa eso. Usar una producción pura funciona bien.
Ron Serruya
1
Los bucles de eventos generalmente solo esperan sus propios eventos. Por lo general, no puede mezclar eventos y bucles de eventos entre bibliotecas; los eventos que se muestran aquí solo funcionan con el ciclo de eventos que se muestra. En concreto, asyncio solo usa None (es decir, un rendimiento mínimo) como señal para el bucle de eventos. Los eventos interactúan directamente con el objeto de bucle de eventos para registrar las activaciones.
MisterMiyagi
12

Su corodesugaring es conceptualmente correcto, pero un poco incompleto.

awaitno se suspende incondicionalmente, pero solo si encuentra una llamada de bloqueo. ¿Cómo sabe que se está bloqueando una llamada? Esto se decide por el código que se espera. Por ejemplo, se podría desaconsejar una implementación esperada de lectura de socket para:

def read(sock, n):
    # sock must be in non-blocking mode
    try:
        return sock.recv(n)
    except EWOULDBLOCK:
        event_loop.add_reader(sock.fileno, current_task())
        return SUSPEND

En asyncio real, el código equivalente modifica el estado de a en Futurelugar de devolver valores mágicos, pero el concepto es el mismo. Cuando se adapta adecuadamente a un objeto similar a un generador, el código anterior se puede awaiteditar.

En el lado de la persona que llama, cuando su corrutina contiene:

data = await read(sock, 1024)

Se convierte en algo parecido a:

data = read(sock, 1024)
if data is SUSPEND:
    return SUSPEND
self.pos += 1
self.parts[self.pos](...)

Las personas familiarizadas con los generadores tienden a describir lo anterior en términos de yield fromcuál hace la suspensión automáticamente.

La cadena de suspensión continúa hasta el bucle de eventos, que advierte que la corrutina está suspendida, la elimina del conjunto ejecutable y continúa para ejecutar las corrutinas que se pueden ejecutar, si las hay. Si no se pueden ejecutar corrutinas, el bucle esperaselect() hasta que un descriptor de archivo en el que una corrutina está interesada esté listo para IO. (El bucle de eventos mantiene una asignación de descriptor de archivo a una rutina).

En el ejemplo anterior, una vez que select()le dice al bucle de eventos que sockes legible, se volverá a agregar coroal conjunto ejecutable, por lo que continuará desde el punto de suspensión.

En otras palabras:

  1. Todo sucede en el mismo hilo por defecto.

  2. El bucle de eventos es responsable de programar las corrutinas y despertarlas cuando lo que sea que estaban esperando (generalmente una llamada IO que normalmente se bloquearía o un tiempo de espera) esté listo.

Para obtener información sobre los bucles de eventos de conducción de corrutinas, recomiendo esta charla de Dave Beazley, donde demuestra la codificación de un bucle de eventos desde cero frente a una audiencia en vivo.

usuario4815162342
fuente
Gracias, esto está más cerca de lo que busco, pero esto todavía no explica por qué async.wait_for()no hace lo que se supone que debe hacer ... ¿Por qué es un problema tan grande agregar una devolución de llamada al bucle de eventos y decirlo? para procesar todas las devoluciones de llamada que necesite, incluida la que acaba de agregar? Mi frustración con asynciose debe en parte al hecho de que el concepto subyacente es muy simple y, por ejemplo, Emacs Lisp se implementó durante años, sin usar palabras de moda ... (es decir, create-async-processy accept-process-output- y esto es todo lo que se necesita ... (cont.)
wvxvw
10
@wvxvw He hecho todo lo que pude para responder la pregunta que publicaste, tanto como sea posible dado que solo el último párrafo contiene seis preguntas. Y así continuamos, no es que wait_for no haga lo que se supone que debe hacer (lo hace, es una corrutina que se supone que debe esperar), es que sus expectativas no coinciden con lo que el sistema fue diseñado e implementado para hacer. Creo que su problema podría coincidir con asyncio si el bucle de eventos se ejecutara en un hilo separado, pero no conozco los detalles de su caso de uso y, honestamente, su actitud no hace que sea muy divertido ayudarlo.
user4815162342
5
@wvxvw My frustration with asyncio is in part due to the fact that the underlying concept is very simple, and, for example, Emacs Lisp had implementation for ages, without using buzzwords...- Entonces , nada te impide implementar este concepto simple sin palabras de moda para Python :) ¿Por qué usas este asincio feo? Implementa el tuyo desde cero. Por ejemplo, puede comenzar creando su propia async.wait_for()función que haga exactamente lo que se supone que debe hacer.
Mikhail Gerasimov
1
@MikhailGerasimov, parece que crees que es una pregunta retórica. Pero me gustaría disipar el misterio por ti. El lenguaje está diseñado para hablar con otros. No puedo elegir para los demás qué idioma hablan, incluso si creo que el idioma que hablan es basura, lo mejor que puedo hacer es tratar de convencerlos de que ese es el caso. En otras palabras, si tuviera la libertad de elegir, nunca elegiría Python para empezar, y mucho menos asyncio. Pero, en principio, esa no es mi decisión. Me obligan a usar lenguaje basura a través de en.wikipedia.org/wiki/Ultimatum_game .
wvxvw
4

Todo se reduce a los dos desafíos principales que asyncio está abordando:

  • ¿Cómo realizar múltiples E / S en un solo hilo?
  • ¿Cómo implementar la multitarea cooperativa?

La respuesta al primer punto ha existido durante mucho tiempo y se llama ciclo de selección . En Python, se implementa en el módulo de selectores .

La segunda pregunta está relacionada con el concepto de corrutina , es decir, funciones que pueden detener su ejecución y ser restauradas posteriormente. En Python, las corrutinas se implementan usando generadores y el rendimiento de la declaración. Eso es lo que se esconde detrás de la sintaxis async / await .

Más recursos en esta respuesta .


EDITAR: Abordar su comentario sobre goroutines:

El equivalente más cercano a una goroutine en asyncio no es en realidad una corrutina sino una tarea (vea la diferencia en la documentación ). En Python, una corrutina (o un generador) no sabe nada sobre los conceptos de bucle de eventos o E / S. Simplemente es una función que puede detener su ejecución yieldmientras mantiene su estado actual, por lo que se puede restaurar más adelante. La yield fromsintaxis permite encadenarlos de forma transparente.

Ahora, dentro de una tarea de asyncio, la corrutina en la parte inferior de la cadena siempre termina dando un futuro . Este futuro luego sube al bucle de eventos y se integra en la maquinaria interna. Cuando el futuro está configurado como hecho por otra devolución de llamada interna, el bucle de eventos puede restaurar la tarea enviando el futuro de regreso a la cadena de corrutinas.


EDITAR: Abordar algunas de las preguntas en su publicación:

¿Cómo ocurre realmente la E / S en este escenario? ¿En un hilo separado? ¿Está suspendido todo el intérprete y la E / S ocurre fuera del intérprete?

No, no pasa nada en un hilo. La E / S siempre es administrada por el bucle de eventos, principalmente a través de descriptores de archivo. Sin embargo, el registro de esos descriptores de archivo suele estar oculto por corrutinas de alto nivel, lo que le hace el trabajo sucio.

¿Qué se entiende exactamente por E / S? Si mi procedimiento de Python llamó al procedimiento C open () y, a su vez, envió una interrupción al kernel, cediéndole el control, ¿cómo sabe el intérprete de Python sobre esto y puede continuar ejecutando algún otro código, mientras que el código del kernel hace la I / O y hasta que despierte el procedimiento de Python que envió la interrupción originalmente? ¿Cómo puede el intérprete de Python, en principio, darse cuenta de que esto está sucediendo?

Una E / S es cualquier llamada de bloqueo. En asyncio, todas las operaciones de E / S deben pasar por el bucle de eventos, porque como dijiste, el bucle de eventos no tiene forma de saber que se está realizando una llamada de bloqueo en algún código síncrono. Eso significa que se supone que no debes usar un sincronizador opendentro del contexto de una corrutina. En su lugar, utilice una biblioteca dedicada como archivos ai que proporcione una versión asincrónica de open.

Vincent
fuente
Decir que las corrutinas se implementan usando yield fromrealmente no dice nada. yield fromes solo una construcción de sintaxis, no es un bloque de construcción fundamental que las computadoras puedan ejecutar. Del mismo modo, para select loop. Sí, las corrutinas en Go también usan el bucle de selección, pero lo que estaba tratando de hacer funcionaría en Go, pero no en Python. Necesito respuestas más detalladas para comprender por qué no funcionó.
wvxvw
Lo siento ... no, en realidad no. "futuro", "tarea", "vía transparente", "rendimiento de" son sólo palabras de moda, no son objetos del dominio de la programación. la programación tiene variables, procedimientos y estructuras. Entonces, decir que "goroutine es una tarea" es solo una declaración circular que plantea una pregunta. En última instancia, una explicación de lo que asynciohace, para mí, se reduciría al código C que ilustra a qué se tradujo la sintaxis de Python.
wvxvw
Para explicar con más detalle por qué su respuesta no responde a mi pregunta: con toda la información que proporcionó, no tengo idea de por qué mi intento con el código que publiqué en la pregunta vinculada no funcionó. Estoy absolutamente seguro de que podría escribir un bucle de eventos de tal manera que este código funcione. De hecho, esta sería la forma en que escribiría un bucle de eventos, si tuviera que escribir uno.
wvxvw
7
@wvxvw No estoy de acuerdo. Esas no son "palabras de moda" sino conceptos de alto nivel que se han implementado en muchas bibliotecas. Por ejemplo, una tarea asyncio, un greenlet gevent y una goroutine corresponden a lo mismo: una unidad de ejecución que se puede ejecutar simultáneamente dentro de un solo hilo. Además, no creo que C sea necesario para comprender asyncio en absoluto, a menos que desee profundizar en el funcionamiento interno de los generadores de Python.
Vincent
@wvxvw Ver mi segunda edición. Esto debería aclarar algunos conceptos erróneos.
Vincent