Digamos que solicito un gran archivo json que contiene una lista de muchos objetos. No quiero que estén en la memoria de una vez, pero prefiero leerlos y procesarlos uno por uno. Entonces necesito convertir una System.IO.Stream
transmisión asíncrona en un IAsyncEnumerable<T>
. ¿Cómo uso la nueva System.Text.Json
API para hacer esto?
private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
{
using (var stream = await httpResponse.Content.ReadAsStreamAsync())
{
// Probably do something with JsonSerializer.DeserializeAsync here without serializing the entire thing in one go
}
}
}
c#
.net-core
.net-core-3.0
c#-8.0
system.text.json
Rick de Water
fuente
fuente
Utf8JsonReader
, por favor, eche un vistazo a algunas de GitHub muestras y en existente hilo asíGetAsync
por sí solo regresa cuando se recibe la respuesta completa . En su lugar, debe usarSendAsync
con `HttpCompletionOption.ResponseContentRead`. Una vez que tenga eso, puede usar JsonTextReader de JSON.NET . UsarSystem.Text.Json
para esto no es tan fácil como muestra este problema . La funcionalidad no está disponible y su implementación en una asignación baja usando estructuras no es trivialRespuestas:
Sí, un serializador JSON (de) verdaderamente de transmisión sería una buena mejora de rendimiento, en muchos lugares.
Lamentablemente,
System.Text.Json
no hace esto en este momento. No estoy seguro de si lo hará en el futuro, ¡eso espero! Verdaderamente, la deserialización de JSON en streaming resulta ser bastante desafiante.Podrías comprobar si el extremadamente rápido Utf8Json lo admite, tal vez.
Sin embargo, puede haber una solución personalizada para su situación específica, ya que sus requisitos parecen limitar la dificultad.
La idea es leer manualmente un elemento de la matriz a la vez. Estamos haciendo uso del hecho de que cada elemento de la lista es, en sí mismo, un objeto JSON válido.
Puede omitir manualmente el
[
(para el primer elemento) o el,
(para cada elemento siguiente). Entonces creo que su mejor opción es usar .NET CoreUtf8JsonReader
para determinar dónde termina el objeto actual y alimentar los bytes escaneadosJsonDeserializer
.De esta manera, solo está almacenando un poco en un búfer a la vez.
Y como estamos hablando de rendimiento, puede obtener la entrada de a
PipeReader
, mientras lo hace. :-)fuente
TL; DR No es trivial
Parece que alguien ya ha publicado el código completo de una
Utf8JsonStreamReader
estructura que lee buffers de una secuencia y los alimenta a un Utf8JsonRreader, lo que permite una fácil deserialización conJsonSerializer.Deserialize<T>(ref newJsonReader, options);
. El código tampoco es trivial. La pregunta relacionada está aquí y la respuesta está aquí .Sin embargo, eso no es suficiente:
HttpClient.GetAsync
regresará solo después de que se reciba la respuesta completa, esencialmente almacenando todo en la memoria intermedia.Para evitar esto, se debe usar HttpClient.GetAsync (string, HttpCompletionOption) con
HttpCompletionOption.ResponseHeadersRead
.El ciclo de deserialización también debe verificar el token de cancelación, y salir o lanzar si está señalado. De lo contrario, el ciclo continuará hasta que se reciba y procese toda la transmisión.
Este código se basa en el ejemplo de la respuesta relacionada y usa
HttpCompletionOption.ResponseHeadersRead
y verifica el token de cancelación. Puede analizar cadenas JSON que contienen una matriz adecuada de elementos, por ejemplo:La primera llamada a se
jsonStreamReader.Read()
mueve al inicio de la matriz, mientras que la segunda se mueve al comienzo del primer objeto. El bucle termina cuando]
se detecta el final de la matriz ( ).Fragmentos JSON, también conocido como streaming JSON aka ... *
Es bastante común en escenarios de transmisión o registro de eventos agregar objetos JSON individuales a un archivo, un elemento por línea, por ejemplo:
Este no es un documento JSON válido , pero los fragmentos individuales son válidos. Esto tiene varias ventajas para big data / escenarios altamente concurrentes. Agregar un nuevo evento solo requiere agregar una nueva línea al archivo, no analizar y reconstruir todo el archivo. El procesamiento , especialmente el procesamiento paralelo , es más fácil por dos razones:
Usando un StreamReader
La forma de asignar y para hacer esto sería usar un TextReader, leer una línea a la vez y analizarlo con JsonSerializer .
Eso es mucho más simple que el código que deserializa una matriz adecuada. Hay dos problemas:
ReadLineAsync
no acepta un token de cancelaciónSin embargo, esto puede ser suficiente ya que tratar de producir los
ReadOnlySpan<Byte>
búferes que necesita JsonSerializer. Deserializar no es trivial.Tuberías y secuenciador
Para evitar todas las ubicaciones, necesitamos obtener una
ReadOnlySpan<byte>
de la transmisión. Hacer esto requiere el uso de tuberías System.IO.Pipeline y la estructura SequenceReader . La Introducción a SequenceReader de Steve Gordon explica cómo se puede usar esta clase para leer datos de una secuencia utilizando delimitadores.Desafortunadamente,
SequenceReader
es una estructura de referencia, lo que significa que no se puede usar en métodos asíncronos o locales. Es por eso que Steve Gordon en su artículo crea unEl método para leer elementos forma una secuencia ReadOnlySequence y devuelve la posición final, de modo que PipeReader pueda reanudarla. Desafortunadamente , queremos devolver un IEnumerable o IAsyncEnumerable, y a los métodos iteradores tampoco les gustan
in
ni losout
parámetros.Podríamos recopilar los elementos deserializados en una Lista o Cola y devolverlos como un solo resultado, pero eso aún asignaría listas, buffers o nodos y tendría que esperar a que todos los elementos en un buffer se deserialicen antes de devolver:
Necesitamos algo que actúe como un enumerable sin requerir un método iterador, que funcione con asíncrono y que no proteja todo de la manera.
Agregar canales para producir un IAsyncEnumerable
ChannelReader.ReadAllAsync devuelve un IAsyncEnumerable. Podemos devolver un ChannelReader a partir de métodos que no podrían funcionar como iteradores y aún producir una secuencia de elementos sin almacenamiento en caché.
Adaptando el código de Steve Gordon para usar canales, obtenemos los ReadItems (ChannelWriter ...) y los
ReadLastItem
métodos. El primero, lee un elemento a la vez, hasta una nueva línea usandoReadOnlySpan<byte> itemBytes
. Esto puede ser usado porJsonSerializer.Deserialize
. SiReadItems
no puede encontrar el delimitador, devuelve su posición para que PipelineReader pueda extraer el siguiente fragmento de la secuencia.Cuando llegamos al último fragmento y no hay otro delimitador, ReadLastItem` lee los bytes restantes y los deserializa.
El código es casi idéntico al de Steve Gordon. En lugar de escribir en la consola, escribimos en ChannelWriter.
El
DeserializeToChannel<T>
método crea un lector de canalización en la parte superior de la secuencia, crea un canal e inicia una tarea de trabajo que analiza fragmentos y los empuja al canal:ChannelReader.ReceiveAllAsync()
se puede usar para consumir todos los artículos a través deIAsyncEnumerable<T>
:fuente
Parece que necesita implementar su propio lector de flujo. Debe leer los bytes uno por uno y detenerse tan pronto como se complete la definición del objeto. De hecho, es bastante bajo nivel. Como tal, NO cargará todo el archivo en la RAM, sino que tomará la parte con la que está tratando. ¿Parece ser una respuesta?
fuente
¿Quizás podrías usar
Newtonsoft.Json
serializador? https://www.newtonsoft.com/json/help/html/Performance.htmEspecialmente ver sección:
Editar
Podría intentar deserializar valores de JsonTextReader, p. Ej.
fuente
I don't want them to be in memory all at once, but I would rather read and process them one by one.
La clase relevante en JSON.NET es JsonTextReader.