Deserialización asincrónica de una lista usando System.Text.Json

11

Digamos que solicito un gran archivo json que contiene una lista de muchos objetos. No quiero que estén en la memoria de una vez, pero prefiero leerlos y procesarlos uno por uno. Entonces necesito convertir una System.IO.Streamtransmisión asíncrona en un IAsyncEnumerable<T>. ¿Cómo uso la nueva System.Text.JsonAPI para hacer esto?

private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
    using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
    {
        using (var stream = await httpResponse.Content.ReadAsStreamAsync())
        {
            // Probably do something with JsonSerializer.DeserializeAsync here without serializing the entire thing in one go
        }
    }
}
Rick de Water
fuente
1
Probablemente necesitará algo como el método DeserializeAsync
Pavel Anikhouski el
2
Lo sentimos, parece que el método anterior carga todo el flujo en la memoria. Usted puede leer los datos por trozos utilizando asynchonously Utf8JsonReader, por favor, eche un vistazo a algunas de GitHub muestras y en existente hilo así
Pavel Anikhouski
GetAsyncpor sí solo regresa cuando se recibe la respuesta completa . En su lugar, debe usar SendAsynccon `HttpCompletionOption.ResponseContentRead`. Una vez que tenga eso, puede usar JsonTextReader de JSON.NET . Usar System.Text.Jsonpara esto no es tan fácil como muestra este problema . La funcionalidad no está disponible y su implementación en una asignación baja usando estructuras no es trivial
Panagiotis Kanavos
El problema con la deserialización en fragmentos es que debe saber cuándo tiene un fragmento completo para deserializar. Esto sería difícil de lograr limpiamente para casos generales. Se requeriría un análisis previo, lo que podría ser una compensación bastante pobre en términos de rendimiento. Sería bastante difícil generalizar. Pero si aplica sus propias restricciones en su JSON, diga "un solo objeto ocupa exactamente 20 líneas en el archivo", entonces esencialmente podría deserializar asincrónicamente al leer el archivo en fragmentos asíncronos. Sin embargo, me imagino que necesitaría un JSON masivo para ver el beneficio aquí.
DetectivePikachu
Parece que alguien ya respondió una pregunta similar aquí con el código completo.
Panagiotis Kanavos

Respuestas:

4

Sí, un serializador JSON (de) verdaderamente de transmisión sería una buena mejora de rendimiento, en muchos lugares.

Lamentablemente, System.Text.Jsonno hace esto en este momento. No estoy seguro de si lo hará en el futuro, ¡eso espero! Verdaderamente, la deserialización de JSON en streaming resulta ser bastante desafiante.

Podrías comprobar si el extremadamente rápido Utf8Json lo admite, tal vez.

Sin embargo, puede haber una solución personalizada para su situación específica, ya que sus requisitos parecen limitar la dificultad.

La idea es leer manualmente un elemento de la matriz a la vez. Estamos haciendo uso del hecho de que cada elemento de la lista es, en sí mismo, un objeto JSON válido.

Puede omitir manualmente el [(para el primer elemento) o el ,(para cada elemento siguiente). Entonces creo que su mejor opción es usar .NET Core Utf8JsonReaderpara determinar dónde termina el objeto actual y alimentar los bytes escaneados JsonDeserializer.

De esta manera, solo está almacenando un poco en un búfer a la vez.

Y como estamos hablando de rendimiento, puede obtener la entrada de a PipeReader, mientras lo hace. :-)

Timo
fuente
No se trata del rendimiento en absoluto. No se trata de deserialización asíncrona, que ya lo hace. Se trata de acceso de transmisión: procesar elementos JSON a medida que se analizan a partir de la transmisión, como lo hace JsonTextReader de JSON.NET.
Panagiotis Kanavos
La clase relevante en Utf8Json es JsonReader y, como dice el autor, es raro. JsonTextReader de JSON.NET y Utf8JsonReader de System.Text.Json comparten la misma rareza: debe recorrer y verificar el tipo de elemento actual a medida que avanza.
Panagiotis Kanavos
@PanagiotisKanavos Ah, sí, streaming. ¡Esa es la palabra que estaba buscando! Estoy actualizando la palabra "asíncrono" a "streaming". Creo que la razón para querer transmitir es limitar el uso de memoria, lo cual es un problema de rendimiento. Quizás OP pueda confirmar.
Timo
El rendimiento no significa velocidad. No importa qué tan rápido sea el deserializador, si tiene que procesar elementos 1M, no desea almacenarlos en la RAM, ni esperar a que todos se deserialicen antes de poder procesar el primero.
Panagiotis Kanavos
Semántica, mi amigo! Me alegra que estemos tratando de lograr lo mismo después de todo.
Timo
4

TL; DR No es trivial


Parece que alguien ya ha publicado el código completo de una Utf8JsonStreamReaderestructura que lee buffers de una secuencia y los alimenta a un Utf8JsonRreader, lo que permite una fácil deserialización con JsonSerializer.Deserialize<T>(ref newJsonReader, options);. El código tampoco es trivial. La pregunta relacionada está aquí y la respuesta está aquí .

Sin embargo, eso no es suficiente: HttpClient.GetAsyncregresará solo después de que se reciba la respuesta completa, esencialmente almacenando todo en la memoria intermedia.

Para evitar esto, se debe usar HttpClient.GetAsync (string, HttpCompletionOption) con HttpCompletionOption.ResponseHeadersRead.

El ciclo de deserialización también debe verificar el token de cancelación, y salir o lanzar si está señalado. De lo contrario, el ciclo continuará hasta que se reciba y procese toda la transmisión.

Este código se basa en el ejemplo de la respuesta relacionada y usa HttpCompletionOption.ResponseHeadersReady verifica el token de cancelación. Puede analizar cadenas JSON que contienen una matriz adecuada de elementos, por ejemplo:

[{"prop1":123},{"prop1":234}]

La primera llamada a se jsonStreamReader.Read()mueve al inicio de la matriz, mientras que la segunda se mueve al comienzo del primer objeto. El bucle termina cuando ]se detecta el final de la matriz ( ).

private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
    //Don't cache the entire response
    using var httpResponse = await httpClient.GetAsync(url,                               
                                                       HttpCompletionOption.ResponseHeadersRead,  
                                                       cancellationToken);
    using var stream = await httpResponse.Content.ReadAsStreamAsync();
    using var jsonStreamReader = new Utf8JsonStreamReader(stream, 32 * 1024);

    jsonStreamReader.Read(); // move to array start
    jsonStreamReader.Read(); // move to start of the object

    while (jsonStreamReader.TokenType != JsonTokenType.EndArray)
    {
        //Gracefully return if cancellation is requested.
        //Could be cancellationToken.ThrowIfCancellationRequested()
        if(cancellationToken.IsCancellationRequested)
        {
            return;
        }

        // deserialize object
        var obj = jsonStreamReader.Deserialize<T>();
        yield return obj;

        // JsonSerializer.Deserialize ends on last token of the object parsed,
        // move to the first token of next object
        jsonStreamReader.Read();
    }
}

Fragmentos JSON, también conocido como streaming JSON aka ... *

Es bastante común en escenarios de transmisión o registro de eventos agregar objetos JSON individuales a un archivo, un elemento por línea, por ejemplo:

{"eventId":1}
{"eventId":2}
...
{"eventId":1234567}

Este no es un documento JSON válido , pero los fragmentos individuales son válidos. Esto tiene varias ventajas para big data / escenarios altamente concurrentes. Agregar un nuevo evento solo requiere agregar una nueva línea al archivo, no analizar y reconstruir todo el archivo. El procesamiento , especialmente el procesamiento paralelo , es más fácil por dos razones:

  • Los elementos individuales se pueden recuperar de uno en uno, simplemente leyendo una línea de una secuencia.
  • El archivo de entrada puede dividirse fácilmente y dividirse a través de los límites de la línea, alimentando cada parte a un proceso de trabajo separado, por ejemplo, en un clúster de Hadoop, o simplemente diferentes hilos en una aplicación: calcule los puntos de división, por ejemplo, dividiendo la longitud por el número de trabajadores , luego busque la primera línea nueva. Alimente todo hasta ese punto a un trabajador separado.

Usando un StreamReader

La forma de asignar y para hacer esto sería usar un TextReader, leer una línea a la vez y analizarlo con JsonSerializer .

using var reader=new StreamReader(stream);
string line;
//ReadLineAsync() doesn't accept a CancellationToken 
while((line=await reader.ReadLineAsync()) != null)
{
    var item=JsonSerializer.Deserialize<T>(line);
    yield return item;

    if(cancellationToken.IsCancellationRequested)
    {
        return;
    }
}

Eso es mucho más simple que el código que deserializa una matriz adecuada. Hay dos problemas:

  • ReadLineAsync no acepta un token de cancelación
  • Cada iteración asigna una nueva cadena, una de las cosas que queríamos evitar al usar System.Text.Json

Sin embargo, esto puede ser suficiente ya que tratar de producir los ReadOnlySpan<Byte>búferes que necesita JsonSerializer. Deserializar no es trivial.

Tuberías y secuenciador

Para evitar todas las ubicaciones, necesitamos obtener una ReadOnlySpan<byte>de la transmisión. Hacer esto requiere el uso de tuberías System.IO.Pipeline y la estructura SequenceReader . La Introducción a SequenceReader de Steve Gordon explica cómo se puede usar esta clase para leer datos de una secuencia utilizando delimitadores.

Desafortunadamente, SequenceReaderes una estructura de referencia, lo que significa que no se puede usar en métodos asíncronos o locales. Es por eso que Steve Gordon en su artículo crea un

private static SequencePosition ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)

El método para leer elementos forma una secuencia ReadOnlySequence y devuelve la posición final, de modo que PipeReader pueda reanudarla. Desafortunadamente , queremos devolver un IEnumerable o IAsyncEnumerable, y a los métodos iteradores tampoco les gustan inni los outparámetros.

Podríamos recopilar los elementos deserializados en una Lista o Cola y devolverlos como un solo resultado, pero eso aún asignaría listas, buffers o nodos y tendría que esperar a que todos los elementos en un buffer se deserialicen antes de devolver:

private static (SequencePosition,List<T>) ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)

Necesitamos algo que actúe como un enumerable sin requerir un método iterador, que funcione con asíncrono y que no proteja todo de la manera.

Agregar canales para producir un IAsyncEnumerable

ChannelReader.ReadAllAsync devuelve un IAsyncEnumerable. Podemos devolver un ChannelReader a partir de métodos que no podrían funcionar como iteradores y aún producir una secuencia de elementos sin almacenamiento en caché.

Adaptando el código de Steve Gordon para usar canales, obtenemos los ReadItems (ChannelWriter ...) y los ReadLastItemmétodos. El primero, lee un elemento a la vez, hasta una nueva línea usando ReadOnlySpan<byte> itemBytes. Esto puede ser usado por JsonSerializer.Deserialize. Si ReadItemsno puede encontrar el delimitador, devuelve su posición para que PipelineReader pueda extraer el siguiente fragmento de la secuencia.

Cuando llegamos al último fragmento y no hay otro delimitador, ReadLastItem` lee los bytes restantes y los deserializa.

El código es casi idéntico al de Steve Gordon. En lugar de escribir en la consola, escribimos en ChannelWriter.

private const byte NL=(byte)'\n';
private const int MaxStackLength = 128;

private static SequencePosition ReadItems<T>(ChannelWriter<T> writer, in ReadOnlySequence<byte> sequence, 
                          bool isCompleted, CancellationToken token)
{
    var reader = new SequenceReader<byte>(sequence);

    while (!reader.End && !token.IsCancellationRequested) // loop until we've read the entire sequence
    {
        if (reader.TryReadTo(out ReadOnlySpan<byte> itemBytes, NL, advancePastDelimiter: true)) // we have an item to handle
        {
            var item=JsonSerializer.Deserialize<T>(itemBytes);
            writer.TryWrite(item);            
        }
        else if (isCompleted) // read last item which has no final delimiter
        {
            var item = ReadLastItem<T>(sequence.Slice(reader.Position));
            writer.TryWrite(item);
            reader.Advance(sequence.Length); // advance reader to the end
        }
        else // no more items in this sequence
        {
            break;
        }
    }

    return reader.Position;
}

private static T ReadLastItem<T>(in ReadOnlySequence<byte> sequence)
{
    var length = (int)sequence.Length;

    if (length < MaxStackLength) // if the item is small enough we'll stack allocate the buffer
    {
        Span<byte> byteBuffer = stackalloc byte[length];
        sequence.CopyTo(byteBuffer);
        var item=JsonSerializer.Deserialize<T>(byteBuffer);
        return item;        
    }
    else // otherwise we'll rent an array to use as the buffer
    {
        var byteBuffer = ArrayPool<byte>.Shared.Rent(length);

        try
        {
            sequence.CopyTo(byteBuffer);
            var item=JsonSerializer.Deserialize<T>(byteBuffer);
            return item;
        }
        finally
        {
            ArrayPool<byte>.Shared.Return(byteBuffer);
        }

    }    
}

El DeserializeToChannel<T>método crea un lector de canalización en la parte superior de la secuencia, crea un canal e inicia una tarea de trabajo que analiza fragmentos y los empuja al canal:

ChannelReader<T> DeserializeToChannel<T>(Stream stream, CancellationToken token)
{
    var pipeReader = PipeReader.Create(stream);    
    var channel=Channel.CreateUnbounded<T>();
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
        while (!token.IsCancellationRequested)
        {
            var result = await pipeReader.ReadAsync(token); // read from the pipe

            var buffer = result.Buffer;

            var position = ReadItems(writer,buffer, result.IsCompleted,token); // read complete items from the current buffer

            if (result.IsCompleted) 
                break; // exit if we've read everything from the pipe

            pipeReader.AdvanceTo(position, buffer.End); //advance our position in the pipe
        }

        pipeReader.Complete(); 
    },token)
    .ContinueWith(t=>{
        pipeReader.Complete();
        writer.TryComplete(t.Exception);
    });

    return channel.Reader;
}

ChannelReader.ReceiveAllAsync()se puede usar para consumir todos los artículos a través de IAsyncEnumerable<T>:

var reader=DeserializeToChannel<MyEvent>(stream,cts.Token);
await foreach(var item in reader.ReadAllAsync(cts.Token))
{
    //Do something with it 
}    
Panagiotis Kanavos
fuente
0

Parece que necesita implementar su propio lector de flujo. Debe leer los bytes uno por uno y detenerse tan pronto como se complete la definición del objeto. De hecho, es bastante bajo nivel. Como tal, NO cargará todo el archivo en la RAM, sino que tomará la parte con la que está tratando. ¿Parece ser una respuesta?

Sereja Bogolubov
fuente
-2

¿Quizás podrías usar Newtonsoft.Jsonserializador? https://www.newtonsoft.com/json/help/html/Performance.htm

Especialmente ver sección:

Optimizar el uso de memoria

Editar

Podría intentar deserializar valores de JsonTextReader, p. Ej.

using (var textReader = new StreamReader(stream))
using (var reader = new JsonTextReader(textReader))
{
    while (await reader.ReadAsync(cancellationToken))
    {
        yield return reader.Value;
    }
}
Miłosz Wieczorek
fuente
Eso no responde la pregunta. No se trata de rendimiento en absoluto, se trata de acceso de transmisión sin cargar todo en la memoria
Panagiotis Kanavos
¿Has abierto el enlace relacionado o acabas de decir lo que piensas? En el enlace que envié en la sección que mencioné, hay un fragmento de código de cómo deserializar JSON de la transmisión.
Miłosz Wieczorek
Lea la pregunta nuevamente, el OP pregunta cómo procesar los elementos sin deserializar todo en la memoria. No solo lee de una transmisión, sino que solo procesa lo que proviene de la transmisión. I don't want them to be in memory all at once, but I would rather read and process them one by one.La clase relevante en JSON.NET es JsonTextReader.
Panagiotis Kanavos
En cualquier caso, una respuesta de solo enlace no se considera una buena respuesta, y nada en ese enlace responde a la pregunta del OP. Un enlace a JsonTextReader sería mejor
Panagiotis Kanavos