Analizar archivo JSON grande en Nodejs

98

Tengo un archivo que almacena muchos objetos JavaScript en formato JSON y necesito leer el archivo, crear cada uno de los objetos y hacer algo con ellos (insertarlos en una base de datos en mi caso). Los objetos JavaScript se pueden representar con un formato:

Formato A:

[{name: 'thing1'},
....
{name: 'thing999999999'}]

o formato B:

{name: 'thing1'}         // <== My choice.
...
{name: 'thing999999999'}

Tenga en cuenta que ...indica muchos objetos JSON. Soy consciente de que podría leer todo el archivo en la memoria y luego usarlo JSON.parse()así:

fs.readFile(filePath, 'utf-8', function (err, fileContents) {
  if (err) throw err;
  console.log(JSON.parse(fileContents));
});

Sin embargo, el archivo podría ser muy grande, preferiría usar una secuencia para lograr esto. El problema que veo con una transmisión es que el contenido del archivo podría dividirse en fragmentos de datos en cualquier momento, entonces, ¿cómo puedo usarlo JSON.parse()en tales objetos?

Idealmente, cada objeto se leería como un fragmento de datos separado, pero no estoy seguro de cómo hacerlo .

var importStream = fs.createReadStream(filePath, {flags: 'r', encoding: 'utf-8'});
importStream.on('data', function(chunk) {

    var pleaseBeAJSObject = JSON.parse(chunk);           
    // insert pleaseBeAJSObject in a database
});
importStream.on('end', function(item) {
   console.log("Woot, imported objects into the database!");
});*/

Tenga en cuenta que deseo evitar leer todo el archivo en la memoria. La eficiencia del tiempo no me importa. Sí, podría intentar leer varios objetos a la vez e insertarlos todos a la vez, pero eso es un ajuste de rendimiento: necesito una forma que esté garantizada para no causar una sobrecarga de memoria, sin importar cuántos objetos estén contenidos en el archivo. .

Puedo elegir usar FormatAo FormatBtal vez algo más, solo especifique en su respuesta. ¡Gracias!

dgh
fuente
Para el formato B, puede analizar el fragmento en busca de nuevas líneas y extraer cada línea completa, concatenando el resto si se corta en el medio. Sin embargo, puede haber una forma más elegante. No he trabajado mucho con streams.
travis

Respuestas:

82

Para procesar un archivo línea por línea, simplemente necesita desacoplar la lectura del archivo y el código que actúa sobre esa entrada. Puede lograr esto almacenando en búfer su entrada hasta que llegue a una nueva línea. Suponiendo que tenemos un objeto JSON por línea (básicamente, formato B):

var stream = fs.createReadStream(filePath, {flags: 'r', encoding: 'utf-8'});
var buf = '';

stream.on('data', function(d) {
    buf += d.toString(); // when data is read, stash it in a string buffer
    pump(); // then process the buffer
});

function pump() {
    var pos;

    while ((pos = buf.indexOf('\n')) >= 0) { // keep going while there's a newline somewhere in the buffer
        if (pos == 0) { // if there's more than one newline in a row, the buffer will now start with a newline
            buf = buf.slice(1); // discard it
            continue; // so that the next iteration will start with data
        }
        processLine(buf.slice(0,pos)); // hand off the line
        buf = buf.slice(pos+1); // and slice the processed data off the buffer
    }
}

function processLine(line) { // here's where we do something with a line

    if (line[line.length-1] == '\r') line=line.substr(0,line.length-1); // discard CR (0x0D)

    if (line.length > 0) { // ignore empty lines
        var obj = JSON.parse(line); // parse the JSON
        console.log(obj); // do something with the data here!
    }
}

Cada vez que el flujo de archivos recibe datos del sistema de archivos, se almacena en un búfer y luego pumpse llama.

Si no hay una nueva línea en el búfer, pumpsimplemente regresa sin hacer nada. Se agregarán más datos (y posiblemente una nueva línea) al búfer la próxima vez que la transmisión obtenga datos, y luego tendremos un objeto completo.

Si hay una nueva línea, pumpcorta el búfer desde el principio hasta la nueva línea y se lo pasa a process. Luego vuelve a comprobar si hay otra nueva línea en el búfer (el whilebucle). De esta manera, podemos procesar todas las líneas que se leyeron en el fragmento actual.

Finalmente, processse llama una vez por línea de entrada. Si está presente, elimina el carácter de retorno de carro (para evitar problemas con los finales de línea: LF frente a CRLF) y luego llama a JSON.parsela línea. En este punto, puede hacer lo que necesite con su objeto.

Tenga en cuenta que JSON.parsees estricto sobre lo que acepta como entrada; debe citar sus identificadores y valores de cadena con comillas dobles . En otras palabras, {name:'thing1'}arrojará un error; debe usar {"name":"thing1"}.

Debido a que no habrá más de un fragmento de datos en la memoria a la vez, esto será extremadamente eficiente en la memoria. También será extremadamente rápido. Una prueba rápida mostró que procesé 10,000 filas en menos de 15 ms.

josh3736
fuente
12
Esta respuesta ahora es redundante. Use JSONStream y tendrá soporte listo para usar.
arcseldon
2
El nombre de la función 'proceso' es incorrecto. 'proceso' debería ser una variable de sistema. Este error me confundió durante horas.
Zhigong Li
17
@arcseldon No creo que el hecho de que haya una biblioteca que haga esto haga que esta respuesta sea redundante. Ciertamente, sigue siendo útil saber cómo se puede hacer sin el módulo.
Kevin B
3
No estoy seguro de si esto funcionaría para un archivo json minificado. ¿Qué pasaría si todo el archivo estuviera empaquetado en una sola línea y no fuera posible usar tales delimitadores? ¿Cómo resolvemos entonces este problema?
SLearner
7
Las bibliotecas de terceros no están hechas de magia. Son como esta respuesta, versiones elaboradas de soluciones hechas a mano, pero simplemente empaquetadas y etiquetadas como un programa. Comprender cómo funcionan las cosas es mucho más importante y relevante que arrojar datos a ciegas en una biblioteca esperando resultados. Solo digo :)
zanona
34

Justo cuando estaba pensando que sería divertido escribir un analizador JSON de transmisión, también pensé que tal vez debería hacer una búsqueda rápida para ver si ya hay uno disponible.

Resulta que lo hay.

Como lo acabo de encontrar, obviamente no lo he usado, así que no puedo comentar sobre su calidad, pero me interesaría saber si funciona.

Funciona, considere el siguiente Javascript y _.isString:

stream.pipe(JSONStream.parse('*'))
  .on('data', (d) => {
    console.log(typeof d);
    console.log("isString: " + _.isString(d))
  });

Esto registrará los objetos a medida que ingresen si la secuencia es una matriz de objetos. Por lo tanto, lo único que se almacena en búfer es un objeto a la vez.

usuario1106925
fuente
29

A partir de octubre de 2014 , puede hacer algo como lo siguiente (utilizando JSONStream): https://www.npmjs.org/package/JSONStream

var fs = require('fs'),
    JSONStream = require('JSONStream'),

var getStream() = function () {
    var jsonData = 'myData.json',
        stream = fs.createReadStream(jsonData, { encoding: 'utf8' }),
        parser = JSONStream.parse('*');
    return stream.pipe(parser);
}

getStream().pipe(MyTransformToDoWhateverProcessingAsNeeded).on('error', function (err) {
    // handle any errors
});

Para demostrar con un ejemplo práctico:

npm install JSONStream event-stream

data.json:

{
  "greeting": "hello world"
}

hola.js:

var fs = require('fs'),
    JSONStream = require('JSONStream'),
    es = require('event-stream');

var getStream = function () {
    var jsonData = 'data.json',
        stream = fs.createReadStream(jsonData, { encoding: 'utf8' }),
        parser = JSONStream.parse('*');
    return stream.pipe(parser);
};

getStream()
    .pipe(es.mapSync(function (data) {
        console.log(data);
    }));
$ node hello.js
// hello world
arcseldon
fuente
2
Esto es en su mayoría cierto y útil, pero creo que debe hacerlo parse('*')o no obtendrá ningún dato.
John Zwinck
@JohnZwinck Gracias, actualicé la respuesta y agregué un ejemplo funcional para demostrarlo completamente.
arcseldon
en el primer bloque de código, se var getStream() = function () {debe eliminar el primer conjunto de paréntesis .
givemesnacks
1
Esto falló con un error de memoria insuficiente con un archivo json de 500 MB.
Keith John Hutchison
18

Me doy cuenta de que desea evitar leer todo el archivo JSON en la memoria si es posible, sin embargo, si tiene la memoria disponible, puede que no sea una mala idea en cuanto al rendimiento. El uso de require () de node.js en un archivo json carga los datos en la memoria muy rápido.

Ejecuté dos pruebas para ver cómo se veía el rendimiento al imprimir un atributo de cada característica de un archivo geojson de 81 MB.

En la primera prueba, leí todo el archivo geojson en la memoria usando var data = require('./geo.json'). Eso tomó 3330 milisegundos y luego imprimir un atributo de cada función tomó 804 milisegundos para un total de 4134 milisegundos. Sin embargo, parecía que node.js estaba usando 411 MB de memoria.

En la segunda prueba, utilicé la respuesta de @ arcseldon con JSONStream + event-stream. Modifiqué la consulta JSONPath para seleccionar solo lo que necesitaba. Esta vez la memoria nunca superó los 82 MB, sin embargo, ¡ahora todo tardó 70 segundos en completarse!

Evan Siroky
fuente
18

Tenía un requisito similar, necesito leer un archivo json grande en el nodo js y procesar datos en fragmentos y llamar a una api y guardar en mongodb. inputFile.json es como:

{
 "customers":[
       { /*customer data*/},
       { /*customer data*/},
       { /*customer data*/}....
      ]
}

Ahora utilicé JsonStream y EventStream para lograr esto sincrónicamente.

var JSONStream = require("JSONStream");
var es = require("event-stream");

fileStream = fs.createReadStream(filePath, { encoding: "utf8" });
fileStream.pipe(JSONStream.parse("customers.*")).pipe(
  es.through(function(data) {
    console.log("printing one customer object read from file ::");
    console.log(data);
    this.pause();
    processOneCustomer(data, this);
    return data;
  }),
  function end() {
    console.log("stream reading ended");
    this.emit("end");
  }
);

function processOneCustomer(data, es) {
  DataModel.save(function(err, dataModel) {
    es.resume();
  });
}
karthick N
fuente
Muchas gracias por agregar su respuesta, mi caso también necesitaba un manejo sincrónico. Sin embargo, después de probar, no me fue posible llamar a "end ()" como una devolución de llamada después de que la tubería haya terminado. Creo que lo único que se podría hacer es agregar un evento, lo que debería suceder después de que la transmisión esté 'finalizada' / 'cerrada' con ´fileStream.on ('close', ...) ´.
nonNumericalFloat
6

Escribí un módulo que puede hacer esto, llamado BFJ . Específicamente, el método bfj.matchse puede usar para dividir un flujo grande en fragmentos discretos de JSON:

const bfj = require('bfj');
const fs = require('fs');

const stream = fs.createReadStream(filePath);

bfj.match(stream, (key, value, depth) => depth === 0, { ndjson: true })
  .on('data', object => {
    // do whatever you need to do with object
  })
  .on('dataError', error => {
    // a syntax error was found in the JSON
  })
  .on('error', error => {
    // some kind of operational error occurred
  })
  .on('end', error => {
    // finished processing the stream
  });

Aquí, bfj.matchdevuelve una secuencia legible en modo objeto que recibirá los elementos de datos analizados y se le pasan 3 argumentos:

  1. Una secuencia legible que contiene el JSON de entrada.

  2. Un predicado que indica qué elementos del JSON analizado se enviarán al flujo de resultados.

  3. Un objeto de opciones que indica que la entrada es JSON delimitado por saltos de línea (esto es para procesar el formato B de la pregunta, no es necesario para el formato A).

Al ser llamado, bfj.matchanalizará JSON desde el flujo de entrada en profundidad primero, llamando al predicado con cada valor para determinar si enviar o no ese elemento al flujo de resultados. Al predicado se le pasan tres argumentos:

  1. La clave de propiedad o el índice de matriz (esto será undefinedpara elementos de nivel superior).

  2. El valor en sí.

  3. La profundidad del elemento en la estructura JSON (cero para elementos de nivel superior).

Por supuesto, también se puede usar un predicado más complejo según sea necesario según los requisitos. También puede pasar una cadena o una expresión regular en lugar de una función de predicado, si desea realizar coincidencias simples con claves de propiedad.

Phil Booth
fuente
4

Resolví este problema usando el módulo split npm . Canalice su transmisión en división, y "dividirá una transmisión y la volverá a ensamblar para que cada línea sea un fragmento ".

Código de muestra:

var fs = require('fs')
  , split = require('split')
  ;

var stream = fs.createReadStream(filePath, {flags: 'r', encoding: 'utf-8'});
var lineStream = stream.pipe(split());
linestream.on('data', function(chunk) {
    var json = JSON.parse(chunk);           
    // ...
});
Brian Leathem
fuente
4

Si tiene control sobre el archivo de entrada y es una matriz de objetos, puede resolver esto más fácilmente. Organice la salida del archivo con cada registro en una línea, así:

[
   {"key": value},
   {"key": value},
   ...

Este sigue siendo JSON válido.

Luego, use el módulo readline de node.js para procesarlos una línea a la vez.

var fs = require("fs");

var lineReader = require('readline').createInterface({
    input: fs.createReadStream("input.txt")
});

lineReader.on('line', function (line) {
    line = line.trim();

    if (line.charAt(line.length-1) === ',') {
        line = line.substr(0, line.length-1);
    }

    if (line.charAt(0) === '{') {
        processRecord(JSON.parse(line));
    }
});

function processRecord(record) {
    // Process the records one at a time here! 
}
Steve Hanov
fuente
-1

Creo que necesitas usar una base de datos. MongoDB es una buena opción en este caso porque es compatible con JSON.

ACTUALIZACIÓN : puede usar la herramienta mongoimport para importar datos JSON en MongoDB.

mongoimport --collection collection --file collection.json
Vadim Baryshev
fuente
1
Esto no responde a la pregunta. Tenga en cuenta que la segunda línea de la pregunta dice que quiere hacer esto para obtener datos en una base de datos .
josh3736
mongoimport solo importa archivos de hasta 16 MB.
Haziq Ahmed