Necesito llamar a un servicio ascendente (Azure Blob Service) para enviar datos a un OutputStream, que luego necesito dar vuelta y devolverlo al cliente, a través de akka. Sin akka (y solo el código de servlet), obtendría el ServletOutputStream y lo pasaría al método del servicio azul.
Lo más cerca que puedo tratar de tropezar, y claramente esto está mal, es algo como esto
Source<ByteString, OutputStream> source = StreamConverters.asOutputStream().mapMaterializedValue(os -> {
blobClient.download(os);
return os;
});
ResponseEntity resposeEntity = HttpEntities.create(ContentTypes.APPLICATION_OCTET_STREAM, preAuthData.getFileSize(), source);
sender().tell(new RequestResult(resposeEntity, StatusCodes.OK), self());
La idea es que estoy llamando a un servicio ascendente para obtener un flujo de salida poblado llamando a blobClient.download (os);
Parece que la función lambda se llama y regresa, pero luego falla, porque no hay datos o algo así. Como si no se supone que debo hacer que la función lambda haga el trabajo, pero ¿quizás devuelva algún objeto que haga el trabajo? No estoy seguro.
¿Cómo se hace esto?
fuente
download
? ¿Transmite datosos
y solo regresa una vez que los datos se escriben?Respuestas:
El problema real aquí es que la API de Azure no está diseñada para contrapresión. No hay forma de que la secuencia de salida le envíe una señal a Azure que no está lista para recibir más datos. Para decirlo de otra manera: si Azure empuja los datos más rápido de lo que puede consumirlos, tendrá que haber alguna falla fea de desbordamiento del búfer en alguna parte.
Aceptando este hecho, lo mejor que podemos hacer es:
Source.lazySource
para comenzar a descargar datos solo cuando haya una demanda posterior (es decir, se está ejecutando la fuente y se están solicitando datos).download
llamada en algún otro hilo para que continúe ejecutándose sin bloquear el retorno de la fuente. Una forma de hacerlo es con unFuture
(no estoy seguro de cuáles son las mejores prácticas de Java, pero debería funcionar bien de cualquier manera). Aunque inicialmente no importará, es posible que deba elegir un contexto de ejecución que no seasystem.dispatcher
: todo depende de sidownload
está bloqueando o no.Me disculpo de antemano si este código Java está mal formado: uso Akka con Scala, por lo que todo esto es al mirar la API de Akka Java y la referencia de sintaxis Java.
fuente
En
OutputStream
este caso, es el "valor materializado" delSource
y solo se creará una vez que se ejecute la secuencia (o "materializada" en una secuencia en ejecución). Ejecutarlo está fuera de su control, ya que le entrega elSource
Akka HTTP y luego ejecutará su fuente..mapMaterializedValue(matval -> ...)
generalmente se usa para transformar el valor materializado, pero dado que se invoca como parte de la materialización, puede usarlo para hacer efectos secundarios como enviar el matval en un mensaje, tal como lo ha descubierto, no hay necesariamente nada malo en que incluso si se ve funky. Es importante comprender que la secuencia no completará su materialización y se ejecutará hasta que se complete esa lambda. Esto significa problemas sidownload()
está bloqueando en lugar de perder algo de trabajo en un hilo diferente e inmediatamente regresando.Sin embargo, existe otra solución:
Source.preMaterialize()
materializa la fuente y le brinda una partePair
del valor materializado y una nuevaSource
que puede usarse para consumir la fuente ya iniciada:Tenga en cuenta que hay algunas cosas adicionales en las que debe pensar en su código, lo más importante si la
blobClient.download(os)
llamada se bloquea hasta que se hace y usted lo llama desde el actor, en ese caso, debe asegurarse de que su actor no muera de hambre al despachador y detener otros actores de su aplicación desde la ejecución (consulte los documentos de Akka: https://doc.akka.io/docs/akka/current/typed/dispatchers.html#blocking-needs-careful-management ).fuente