pasar una secuencia de Akka a un servicio ascendente para poblar

9

Necesito llamar a un servicio ascendente (Azure Blob Service) para enviar datos a un OutputStream, que luego necesito dar vuelta y devolverlo al cliente, a través de akka. Sin akka (y solo el código de servlet), obtendría el ServletOutputStream y lo pasaría al método del servicio azul.

Lo más cerca que puedo tratar de tropezar, y claramente esto está mal, es algo como esto

        Source<ByteString, OutputStream> source = StreamConverters.asOutputStream().mapMaterializedValue(os -> {
            blobClient.download(os);
            return os;
        });

        ResponseEntity resposeEntity = HttpEntities.create(ContentTypes.APPLICATION_OCTET_STREAM, preAuthData.getFileSize(), source);

        sender().tell(new RequestResult(resposeEntity, StatusCodes.OK), self());

La idea es que estoy llamando a un servicio ascendente para obtener un flujo de salida poblado llamando a blobClient.download (os);

Parece que la función lambda se llama y regresa, pero luego falla, porque no hay datos o algo así. Como si no se supone que debo hacer que la función lambda haga el trabajo, pero ¿quizás devuelva algún objeto que haga el trabajo? No estoy seguro.

¿Cómo se hace esto?

MeBigFatGuy
fuente
¿Cuál es el comportamiento de download? ¿Transmite datos osy solo regresa una vez que los datos se escriben?
Alec

Respuestas:

2

El problema real aquí es que la API de Azure no está diseñada para contrapresión. No hay forma de que la secuencia de salida le envíe una señal a Azure que no está lista para recibir más datos. Para decirlo de otra manera: si Azure empuja los datos más rápido de lo que puede consumirlos, tendrá que haber alguna falla fea de desbordamiento del búfer en alguna parte.

Aceptando este hecho, lo mejor que podemos hacer es:

  • Úselo Source.lazySourcepara comenzar a descargar datos solo cuando haya una demanda posterior (es decir, se está ejecutando la fuente y se están solicitando datos).
  • Ponga la downloadllamada en algún otro hilo para que continúe ejecutándose sin bloquear el retorno de la fuente. Una forma de hacerlo es con un Future(no estoy seguro de cuáles son las mejores prácticas de Java, pero debería funcionar bien de cualquier manera). Aunque inicialmente no importará, es posible que deba elegir un contexto de ejecución que no sea system.dispatcher: todo depende de si downloadestá bloqueando o no.

Me disculpo de antemano si este código Java está mal formado: uso Akka con Scala, por lo que todo esto es al mirar la API de Akka Java y la referencia de sintaxis Java.

ResponseEntity responseEntity = HttpEntities.create(
  ContentTypes.APPLICATION_OCTET_STREAM,
  preAuthData.getFileSize(),

  // Wait until there is downstream demand to intialize the source...
  Source.lazySource(() -> {
    // Pre-materialize the outputstream before the source starts running
    Pair<OutputStream, Source<ByteString, NotUsed>> pair =
      StreamConverters.asOutputStream().preMaterialize(system);

    // Start writing into the download stream in a separate thread
    Futures.future(() -> { blobClient.download(pair.first()); return pair.first(); }, system.getDispatcher());

    // Return the source - it should start running since `lazySource` indicated demand
    return pair.second();
  })
);

sender().tell(new RequestResult(responseEntity, StatusCodes.OK), self());
Alec
fuente
Fantástico. muchas gracias. Una pequeña edición de su ejemplo es: Futures.future (() -> {blobClient.download (pair.first ()); return pair.first ();}, system.getDispatcher ());
MeBigFatGuy
@MeBigFatGuy Bien, gracias!
Alec
1

En OutputStreameste caso, es el "valor materializado" del Sourcey solo se creará una vez que se ejecute la secuencia (o "materializada" en una secuencia en ejecución). Ejecutarlo está fuera de su control, ya que le entrega el SourceAkka HTTP y luego ejecutará su fuente.

.mapMaterializedValue(matval -> ...)generalmente se usa para transformar el valor materializado, pero dado que se invoca como parte de la materialización, puede usarlo para hacer efectos secundarios como enviar el matval en un mensaje, tal como lo ha descubierto, no hay necesariamente nada malo en que incluso si se ve funky. Es importante comprender que la secuencia no completará su materialización y se ejecutará hasta que se complete esa lambda. Esto significa problemas si download()está bloqueando en lugar de perder algo de trabajo en un hilo diferente e inmediatamente regresando.

Sin embargo, existe otra solución: Source.preMaterialize()materializa la fuente y le brinda una parte Pairdel valor materializado y una nueva Sourceque puede usarse para consumir la fuente ya iniciada:

Pair<OutputStream, Source<ByteString, NotUsed>> pair = 
  StreamConverters.asOutputStream().preMaterialize(system);
OutputStream os = pair.first();
Source<ByteString, NotUsed> source = pair.second();

Tenga en cuenta que hay algunas cosas adicionales en las que debe pensar en su código, lo más importante si la blobClient.download(os)llamada se bloquea hasta que se hace y usted lo llama desde el actor, en ese caso, debe asegurarse de que su actor no muera de hambre al despachador y detener otros actores de su aplicación desde la ejecución (consulte los documentos de Akka: https://doc.akka.io/docs/akka/current/typed/dispatchers.html#blocking-needs-careful-management ).

johanandren
fuente
1
Gracias por la respuesta. No veo cómo esto podría funcionar. ¿a dónde van los bytes cuando se llama a blobClient.download (os) (si lo llamo yo mismo)? Imagine que hay un terabyte de datos esperando a ser escritos. me parece que la llamada blobClient.download debe invocarse desde la llamada sender.tell para que esto sea básicamente una operación similar a IOUtils.copy. Usando preMaterialize no puedo ver cómo sucede.
MeBigFatGuy
OutputStream tiene un búfer interno, comenzará a aceptar escrituras hasta que ese búfer se llene, si el asíncrono en sentido descendente no ha comenzado a consumir elementos para entonces, bloqueará el hilo de escritura (es por eso que mencioné que es importante manejar el bloqueo).
johanandren
1
Pero si preMaterialize y obtengo OutputStream, entonces es mi código el que está haciendo blobClient.download (os); ¿correcto? Eso significa que tiene que completarse antes de que pueda continuar, lo cual es imposible.
MeBigFatGuy
Si la descarga (os) no se bifurca de un hilo, tendrá que lidiar con el bloqueo y asegurarse de que no detenga ninguna otra operación. Una forma sería bifurcar un hilo para hacer el trabajo, otra sería responder primero del actor y luego hacer el trabajo de bloqueo allí, en ese caso debe asegurarse de que el actor no muera de hambre a otros actores, vea el enlace al final de mi respuesta.
johanandren
en este punto solo estoy tratando de que funcione. Ni siquiera puede procesar un archivo de 10 bytes.
MeBigFatGuy