La biblioteca de Akka Streams ya viene con una gran cantidad de documentación . Sin embargo, el principal problema para mí es que proporciona demasiado material: me siento abrumado por la cantidad de conceptos que tengo que aprender. Muchos ejemplos mostrados allí se sienten muy pesados y no se pueden traducir fácilmente a casos de uso del mundo real y, por lo tanto, son bastante esotéricos. Creo que da demasiados detalles sin explicar cómo construir todos los bloques de construcción juntos y cómo exactamente ayuda a resolver problemas específicos.
Hay fuentes, sumideros, flujos, etapas de gráficos, gráficos parciales, materialización, un DSL de gráficos y mucho más y simplemente no sé por dónde empezar. La guía de inicio rápido está destinada a ser un punto de partida, pero no la entiendo. Simplemente incluye los conceptos mencionados anteriormente sin explicarlos. Además, los ejemplos de código no se pueden ejecutar: faltan partes, lo que me hace más o menos imposible seguir el texto.
¿Alguien puede explicar los conceptos de fuentes, sumideros, flujos, etapas de gráficos, gráficos parciales, materialización y tal vez algunas otras cosas que omití en palabras simples y con ejemplos fáciles que no explican cada detalle (y que probablemente no sean necesarios de todos modos en el principio)?
fuente
Respuestas:
Esta respuesta se basa en la
akka-stream
versión2.4.2
. La API puede ser ligeramente diferente en otras versiones. La dependencia puede ser consumida por sbt :Muy bien, comencemos. La API de Akka Streams consta de tres tipos principales. A diferencia de las secuencias reactivas , estos tipos son mucho más potentes y, por lo tanto, más complejos. Se supone que para todos los ejemplos de código ya existen las siguientes definiciones:
Las
import
declaraciones son necesarias para las declaraciones de tipo.system
representa el sistema de actores de Akka ymaterializer
representa el contexto de evaluación de la transmisión. En nuestro caso usamos aActorMaterializer
, lo que significa que las transmisiones se evalúan sobre los actores. Ambos valores están marcados comoimplicit
, lo que le da al compilador de Scala la posibilidad de inyectar estas dos dependencias automáticamente cuando sea necesario. También importamossystem.dispatcher
, que es un contexto de ejecución paraFutures
.Una nueva API
Akka Streams tiene estas propiedades clave:
Materializer
.Source
,Sink
yFlow
. Los bloques de construcción forman un gráfico cuya evaluación se basa enMaterializer
y debe activarse explícitamente.A continuación se presentará una introducción más profunda sobre cómo usar los tres tipos principales.
Fuente
A
Source
es un creador de datos, sirve como fuente de entrada para la transmisión. Cada unoSource
tiene un solo canal de salida y ningún canal de entrada. Todos los datos fluyen a través del canal de salida a lo que esté conectado alSource
.Imagen tomada de boldradius.com .
A
Source
se puede crear de múltiples maneras:En los casos anteriores, los alimentamos
Source
con datos finitos, lo que significa que eventualmente terminarán. No hay que olvidar que los flujos reactivos son vagos y asíncronos de forma predeterminada. Esto significa que uno tiene que solicitar explícitamente la evaluación de la secuencia. En Akka Streams esto se puede hacer a través de losrun*
métodos. ElrunForeach
no sería diferente a laforeach
función bien conocida : a través de larun
adición, hace explícito que solicitemos una evaluación de la secuencia. Como los datos finitos son aburridos, continuamos con uno infinito:Con el
take
método podemos crear un punto de parada artificial que nos impide evaluar indefinidamente. Como el soporte para actores está integrado, también podemos alimentar fácilmente la transmisión con mensajes que se envían a un actor:Podemos ver que
Futures
se ejecutan de forma asincrónica en diferentes subprocesos, lo que explica el resultado. En el ejemplo anterior, no es necesario un búfer para los elementos entrantes y, porOverflowStrategy.fail
lo tanto , podemos configurar que la secuencia falle en un desbordamiento del búfer. Especialmente a través de esta interfaz de actor, podemos alimentar la transmisión a través de cualquier fuente de datos. No importa si los datos son creados por el mismo hilo, por otro diferente, por otro proceso o si provienen de un sistema remoto a través de Internet.Lavabo
A
Sink
es básicamente lo contrario de aSource
. Es el punto final de una secuencia y, por lo tanto, consume datos. ASink
tiene un solo canal de entrada y ningún canal de salida.Sinks
son especialmente necesarios cuando queremos especificar el comportamiento del recopilador de datos de forma reutilizable y sin evaluar la secuencia. Losrun*
métodos ya conocidos no nos permiten estas propiedades, por lo tanto, se prefiere usarSink
en su lugar.Imagen tomada de boldradius.com .
Un breve ejemplo de un
Sink
en acción:La conexión de a
Source
a aSink
se puede hacer con elto
método. Devuelve un llamadoRunnableFlow
, que es como veremos más adelante una forma especial de aFlow
- una secuencia que se puede ejecutar simplemente llamando a surun()
método.Imagen tomada de boldradius.com .
Por supuesto, es posible reenviar todos los valores que llegan a un sumidero a un actor:
Fluir
Las fuentes de datos y los sumideros son excelentes si necesita una conexión entre las transmisiones de Akka y un sistema existente, pero uno realmente no puede hacer nada con ellas. Los flujos son la última pieza que falta en la abstracción base de Akka Streams. Actúan como un conector entre diferentes flujos y se pueden usar para transformar sus elementos.
Imagen tomada de boldradius.com .
Si un
Flow
está conectado aSource
un nuevoSource
es el resultado. Del mismo modo, unFlow
conectado a unSink
crea un nuevoSink
. Y unFlow
conectado con aSource
y aSink
resulta en aRunnableFlow
. Por lo tanto, se sientan entre el canal de entrada y el de salida, pero por sí mismos no corresponden a uno de los sabores siempre que no estén conectados a aSource
o aSink
.Imagen tomada de boldradius.com .
Para obtener una mejor comprensión de
Flows
, veremos algunos ejemplos:Mediante el
via
método podemos conectar aSource
con aFlow
. Necesitamos especificar el tipo de entrada porque el compilador no puede inferirlo por nosotros. Como ya podemos ver en este sencillo ejemplo, los flujosinvert
ydouble
son completamente independientes de cualquier productor y consumidor de datos. Solo transforman los datos y los reenvían al canal de salida. Esto significa que podemos reutilizar un flujo entre múltiples flujos:s1
ys2
representan flujos completamente nuevos: no comparten ningún dato a través de sus componentes básicos.Flujos de datos ilimitados
Antes de continuar, primero debemos volver a visitar algunos de los aspectos clave de las secuencias reactivas. Un número ilimitado de elementos puede llegar a cualquier punto y puede poner una secuencia en diferentes estados. Además de un flujo ejecutable, que es el estado habitual, un flujo puede detenerse ya sea por un error o por una señal que denota que no llegarán más datos. Una secuencia se puede modelar de forma gráfica marcando eventos en una línea de tiempo, como es el caso aquí:
Imagen tomada de La introducción a la Programación reactiva que te has estado perdiendo .
Ya hemos visto flujos ejecutables en los ejemplos de la sección anterior. Obtenemos un
RunnableGraph
cada vez que una secuencia se puede materializar, lo que significa que aSink
está conectado a aSource
. Hasta ahora siempre nos hemos materializado en el valorUnit
, que se puede ver en los tipos:Para
Source
ySink
el segundo parámetro de tipo y paraFlow
el tercer parámetro de tipo denotan el valor materializado. A lo largo de esta respuesta, no se explicará el significado completo de la materialización. Sin embargo, se pueden encontrar más detalles sobre la materialización en la documentación oficial . Por ahora, lo único que necesitamos saber es que el valor materializado es lo que obtenemos cuando ejecutamos una secuencia. Como hasta ahora solo estábamos interesados en los efectos secundarios, obtuvimosUnit
el valor materializado. La excepción a esto fue la materialización de un sumidero, que resultó en aFuture
. Nos devolvió unFuture
, ya que este valor puede denotar cuando la secuencia que está conectada al sumidero ha finalizado. Hasta ahora, los ejemplos de código anteriores fueron agradables para explicar el concepto, pero también fueron aburridos porque solo tratamos con flujos finitos o con infinitos muy simples. Para hacerlo más interesante, a continuación se explicará una secuencia asíncrona completa y sin límites.Ejemplo de ClickStream
Como ejemplo, queremos tener una secuencia que capture eventos de clic. Para hacerlo más desafiante, digamos que también queremos agrupar los eventos de clics que ocurren en poco tiempo uno después del otro. De esta manera, podríamos descubrir fácilmente clics dobles, triples o diez veces. Además, queremos filtrar todos los clics individuales. Respira hondo e imagina cómo resolverías ese problema de manera imperativa. Apuesto a que nadie podría implementar una solución que funcione correctamente en el primer intento. De manera reactiva, este problema es trivial de resolver. De hecho, la solución es tan simple y sencilla de implementar que incluso podemos expresarla en un diagrama que describa directamente el comportamiento del código:
Imagen tomada de La introducción a la Programación reactiva que te has estado perdiendo .
Los cuadros grises son funciones que describen cómo una secuencia se transforma en otra. Con la
throttle
función acumulamos clics dentro de 250 milisegundos, las funcionesmap
yfilter
deben explicarse por sí mismas. Las esferas de color representan un evento y las flechas representan cómo fluyen a través de nuestras funciones. Más adelante en los pasos de procesamiento, obtenemos cada vez menos elementos que fluyen a través de nuestra secuencia, ya que los agrupamos y filtramos. El código para esta imagen se vería así:¡Toda la lógica se puede representar en solo cuatro líneas de código! En Scala, podríamos escribirlo aún más corto:
La definición de
clickStream
es un poco más compleja, pero este es solo el caso porque el programa de ejemplo se ejecuta en la JVM, donde la captura de eventos de clic no es fácilmente posible. Otra complicación es que Akka por defecto no proporciona lathrottle
función. En cambio, tuvimos que escribirlo nosotros mismos. Dado que esta función es (como es el caso de las funcionesmap
ofilter
) reutilizable en diferentes casos de uso, no cuento estas líneas en la cantidad de líneas que necesitábamos para implementar la lógica. Sin embargo, en lenguajes imperativos, es normal que la lógica no se pueda reutilizar tan fácilmente y que los diferentes pasos lógicos sucedan en un solo lugar en lugar de aplicarse secuencialmente, lo que significa que probablemente habríamos deformado nuestro código con la lógica de limitación. El ejemplo de código completo está disponible comoesencia y no se discutirá aquí más adelante.Ejemplo de SimpleWebServer
Lo que debería discutirse en su lugar es otro ejemplo. Si bien la secuencia de clics es un buen ejemplo para permitir que Akka Streams maneje un ejemplo del mundo real, carece del poder para mostrar la ejecución paralela en acción. El siguiente ejemplo representará un pequeño servidor web que puede manejar múltiples solicitudes en paralelo. El servidor web podrá aceptar conexiones entrantes y recibir secuencias de bytes de ellos que representen signos ASCII imprimibles. Estas secuencias de bytes o cadenas deben dividirse en todos los caracteres de nueva línea en partes más pequeñas. Después de eso, el servidor responderá al cliente con cada una de las líneas divididas. Alternativamente, podría hacer algo más con las líneas y dar un token de respuesta especial, pero queremos mantenerlo simple en este ejemplo y, por lo tanto, no introducir ninguna característica sofisticada. Recuerda, el servidor necesita poder manejar múltiples solicitudes al mismo tiempo, lo que básicamente significa que ninguna solicitud puede bloquear ninguna otra solicitud para su posterior ejecución. Resolver todos estos requisitos puede ser difícil de una manera imperativa; sin embargo, con Akka Streams, no deberíamos necesitar más que unas pocas líneas para resolver ninguno de estos. Primero, tengamos una visión general sobre el servidor en sí:
Básicamente, solo hay tres bloques de construcción principales. El primero debe aceptar las conexiones entrantes. El segundo debe manejar las solicitudes entrantes y el tercero debe enviar una respuesta. Implementar todos estos tres bloques de construcción es solo un poco más complicado que implementar la secuencia de clics:
La función
mkServer
toma (además de la dirección y el puerto del servidor) también un sistema de actor y un materializador como parámetros implícitos. El flujo de control del servidor está representado porbinding
, que toma una fuente de conexiones entrantes y las reenvía a un sumidero de conexiones entrantes. Dentro deconnectionHandler
, que es nuestro sumidero, manejamos cada conexión por el flujoserverLogic
, que se describirá más adelante.binding
devuelve unFuture
, que se completa cuando el servidor se ha iniciado o el inicio falló, lo que podría ser el caso cuando el puerto ya está ocupado por otro proceso. Sin embargo, el código no refleja completamente el gráfico ya que no podemos ver un bloque de construcción que maneje las respuestas. La razón de esto es que la conexión ya proporciona esta lógica por sí misma. Es un flujo bidireccional y no solo unidireccional como los flujos que hemos visto en los ejemplos anteriores. Como fue el caso de la materialización, tales flujos complejos no se explicarán aquí. La documentación oficial tiene mucho material para cubrir gráficos de flujo más complejos. Por ahora es suficiente saber queTcp.IncomingConnection
representa una conexión que sabe cómo recibir solicitudes y cómo enviar respuestas. La parte que aún falta es laserverLogic
bloque de construcción. Puede verse así:Una vez más, podemos dividir la lógica en varios bloques de construcción simples que forman el flujo de nuestro programa. Primero queremos dividir nuestra secuencia de bytes en líneas, lo que tenemos que hacer cada vez que encontramos un carácter de nueva línea. Después de eso, los bytes de cada línea deben convertirse en una cadena porque trabajar con bytes sin procesar es engorroso. En general, podríamos recibir una secuencia binaria de un protocolo complicado, lo que haría que trabajar con los datos sin procesar entrantes sea extremadamente desafiante. Una vez que tenemos una cadena legible, podemos crear una respuesta. Por razones de simplicidad, la respuesta puede ser cualquier cosa en nuestro caso. Al final, tenemos que convertir nuestra respuesta a una secuencia de bytes que se pueden enviar por cable. El código para toda la lógica puede verse así:
Ya sabemos que
serverLogic
es un flujo que toma aByteString
y tiene que producir aByteString
. Condelimiter
podemos dividir unByteString
en partes más pequeñas, en nuestro caso tiene que suceder cada vez que se produce un carácter de nueva línea.receiver
es el flujo que toma todas las secuencias de bytes divididas y las convierte en una cadena. Por supuesto, esta es una conversión peligrosa, ya que solo los caracteres ASCII imprimibles deben convertirse en una cadena, pero para nuestras necesidades es lo suficientemente bueno.responder
es el último componente y es responsable de crear una respuesta y convertir la respuesta nuevamente en una secuencia de bytes. A diferencia del gráfico, no dividimos este último componente en dos, ya que la lógica es trivial. Al final, conectamos todos los flujos a través delvia
función. En este punto, uno puede preguntarse si nos ocupamos de la propiedad multiusuario que se mencionó al principio. Y, de hecho, lo hicimos a pesar de que puede no ser obvio de inmediato. Al mirar este gráfico, debería ser más claro:El
serverLogic
componente no es más que un flujo que contiene flujos más pequeños. Este componente toma una entrada, que es una solicitud, y produce una salida, que es la respuesta. Dado que los flujos se pueden construir varias veces y todos funcionan de forma independiente entre sí, a través de este anidamiento logramos nuestra propiedad multiusuario. Cada solicitud se maneja dentro de su propia solicitud y, por lo tanto, una solicitud de ejecución corta puede anular una solicitud de ejecución larga iniciada anteriormente. En caso de que se lo haya preguntado, la definición deserverLogic
eso que se mostró anteriormente, por supuesto, se puede escribir mucho más corta al incluir la mayoría de sus definiciones internas:Una prueba del servidor web puede verse así:
Para que el ejemplo de código anterior funcione correctamente, primero debemos iniciar el servidor, que se describe en el
startServer
script:El ejemplo de código completo de este simple servidor TCP se puede encontrar aquí . No solo podemos escribir un servidor con Akka Streams sino también el cliente. Puede verse así:
El código completo del cliente TCP se puede encontrar aquí . El código se ve bastante similar, pero a diferencia del servidor, ya no tenemos que administrar las conexiones entrantes.
Gráficos complejos
En las secciones anteriores hemos visto cómo podemos construir programas simples a partir de flujos. Sin embargo, en realidad a menudo no es suficiente confiar solo en funciones ya integradas para construir flujos más complejos. Si queremos poder usar Akka Streams para programas arbitrarios, necesitamos saber cómo construir nuestras propias estructuras de control personalizadas y flujos combinables que nos permitan abordar la complejidad de nuestras aplicaciones. La buena noticia es que Akka Streams se diseñó para adaptarse a las necesidades de los usuarios y para darle una breve introducción a las partes más complejas de Akka Streams, agregamos algunas características más a nuestro ejemplo de cliente / servidor.
Una cosa que aún no podemos hacer es cerrar una conexión. En este punto, comienza a complicarse un poco más porque la API de transmisión que hemos visto hasta ahora no nos permite detener una transmisión en un punto arbitrario. Sin embargo, existe la
GraphStage
abstracción, que puede usarse para crear etapas de procesamiento de gráficos arbitrarias con cualquier número de puertos de entrada o salida. Primero echemos un vistazo al lado del servidor, donde presentamos un nuevo componente, llamadocloseConnection
:Esta API parece mucho más engorrosa que la API de flujo. No es de extrañar, tenemos que hacer muchos pasos imprescindibles aquí. A cambio, tenemos más control sobre el comportamiento de nuestras transmisiones. En el ejemplo anterior, solo especificamos un puerto de entrada y uno de salida y los ponemos a disposición del sistema anulando el
shape
valor. Además, definimos un llamadoInHandler
y unOutHandler
, que son en este orden responsables de recibir y emitir elementos. Si mira de cerca el ejemplo de flujo de clics completo, ya debería reconocer estos componentes. En el,InHandler
tomamos un elemento y si es una cadena con un solo carácter'q'
, queremos cerrar la secuencia. Para darle al cliente la oportunidad de descubrir que la secuencia se cerrará pronto, emitimos la cadena"BYE"
e inmediatamente cerramos el escenario luego. ElcloseConnection
componente se puede combinar con una corriente a través delvia
método, que se introdujo en la sección sobre flujos.Además de poder cerrar las conexiones, también sería bueno si pudiéramos mostrar un mensaje de bienvenida a una conexión recién creada. Para hacer esto, una vez más tenemos que ir un poco más allá:
La función
serverLogic
ahora toma la conexión entrante como parámetro. Dentro de su cuerpo usamos un DSL que nos permite describir el comportamiento complejo de la transmisión. Conwelcome
creamos una secuencia que solo puede emitir un elemento: el mensaje de bienvenida.logic
es lo que se describióserverLogic
en la sección anterior. La única diferencia notable es que le agregamoscloseConnection
. Ahora en realidad viene la parte interesante del DSL. LaGraphDSL.create
función pone ab
disposición un generador , que se utiliza para expresar la secuencia como un gráfico. Con la~>
función es posible conectar puertos de entrada y salida entre sí. ElConcat
componente que se usa en el ejemplo puede concatenar elementos y aquí se usa para anteponer el mensaje de bienvenida frente a los otros elementos que salen deinternalLogic
. En la última línea, solo ponemos a disposición el puerto de entrada de la lógica del servidor y el puerto de salida de la secuencia concatenada porque todos los demás puertos seguirán siendo un detalle de implementación delserverLogic
componente. Para una introducción en profundidad al gráfico DSL de Akka Streams, visite la sección correspondiente en la documentación oficial . El ejemplo de código completo del complejo servidor TCP y de un cliente que puede comunicarse con él se puede encontrar aquí . Cada vez que abra una nueva conexión desde el cliente, debería ver un mensaje de bienvenida y al escribir"q"
en el cliente debería ver un mensaje que le indica que la conexión se ha cancelado.Todavía hay algunos temas que no fueron cubiertos por esta respuesta. Especialmente la materialización puede asustar a un lector u otro, pero estoy seguro de que con el material que se cubre aquí, todos deberían poder seguir los próximos pasos por sí mismos. Como ya se dijo, la documentación oficial es un buen lugar para continuar aprendiendo sobre Akka Streams.
fuente