Analizar archivos de registro enormes en Node.js: leer línea por línea

126

Necesito hacer un análisis de archivos de registro grandes (5-10 Gb) en Javascript / Node.js (estoy usando Cube).

El logline se parece a:

10:00:43.343423 I'm a friendly log message. There are 5 cats, and 7 dogs. We are in state "SUCCESS".

Hay que leer cada línea, hacer un poco de análisis (por ejemplo, pelar hacia fuera 5, 7y SUCCESS), entonces bombear estos datos en Cubo ( https://github.com/square/cube ) usando su cliente de JS.

En primer lugar, ¿cuál es la forma canónica en Node de leer un archivo, línea por línea?

Parece ser una pregunta bastante común en línea:

Muchas de las respuestas parecen apuntar a un montón de módulos de terceros:

Sin embargo, esto parece una tarea bastante básica; seguramente, hay una forma simple dentro de stdlib para leer en un archivo de texto, línea por línea.

En segundo lugar, necesito procesar cada línea (por ejemplo, convertir la marca de tiempo en un objeto de fecha y extraer campos útiles).

¿Cuál es la mejor manera de hacer esto, maximizando el rendimiento? ¿Hay alguna forma que no bloquee la lectura de cada línea o el envío a Cube?

En tercer lugar, supongo que usar divisiones de cadenas, y el equivalente en JS de contains (IndexOf! = -1?) Será mucho más rápido que las expresiones regulares. ¿Alguien ha tenido mucha experiencia en analizar cantidades masivas de datos de texto en Node.js?

Saludos, Victor

victorhooi
fuente
Construí un analizador de registros en el nodo que toma un montón de cadenas de expresiones regulares con 'capturas' integradas y salidas a JSON. Incluso puede llamar a funciones en cada captura si desea hacer un cálculo. Puede hacer lo que quieras: npmjs.org/package/logax
Jess

Respuestas:

209

Busqué una solución para analizar archivos muy grandes (gbs) línea por línea usando una secuencia. Todas las bibliotecas y ejemplos de terceros no se adaptaron a mis necesidades, ya que procesaron los archivos no línea por línea (como 1, 2, 3, 4 ..) o leyeron todo el archivo en la memoria.

La siguiente solución puede analizar archivos muy grandes, línea por línea, usando stream & pipe. Para las pruebas utilicé un archivo de 2,1 gb con 17.000.000 registros. El uso de RAM no superó los 60 mb.

Primero, instale el paquete event-stream :

npm install event-stream

Luego:

var fs = require('fs')
    , es = require('event-stream');

var lineNr = 0;

var s = fs.createReadStream('very-large-file.csv')
    .pipe(es.split())
    .pipe(es.mapSync(function(line){

        // pause the readstream
        s.pause();

        lineNr += 1;

        // process line here and call s.resume() when rdy
        // function below was for logging memory usage
        logMemoryUsage(lineNr);

        // resume the readstream, possibly from a callback
        s.resume();
    })
    .on('error', function(err){
        console.log('Error while reading file.', err);
    })
    .on('end', function(){
        console.log('Read entire file.')
    })
);

ingrese la descripción de la imagen aquí

¡Por favor déjame saber cómo va!

Gerard
fuente
6
FYI, este código no es síncrono. Es asincrónico. Si inserta console.log(lineNr)después de la última línea de su código, no mostrará el recuento de líneas final porque el archivo se lee de forma asincrónica.
jfriend00
4
Gracias, esta fue la única solución que pude encontrar que en realidad se detuvo y se reanudó cuando se suponía que debía hacerlo. Readline no lo hizo.
Brent
3
Impresionante ejemplo, y realmente se detiene. Además, si usted decide dejar de leer el archivo temprana puede utilizars.end();
zipzit
2
Trabajado como un encanto. Se usó para indexar 150 millones de documentos en el índice de búsqueda elástica. readlinemódulo es una molestia. No se detiene y estaba causando fallas cada vez después de 40-50 millones. Perdido un día. Muchas gracias por la respuesta. Esta funciona perfectamente
Mandeep Singh
3
event-stream fue comprometido: medium.com/intrinsic/… pero 4+ es aparentemente seguro blog.npmjs.org/post/180565383195/…
John Vandivier
72

Puede utilizar el readlinepaquete incorporado , consulte los documentos aquí . Utilizo stream para crear un nuevo flujo de salida.

var fs = require('fs'),
    readline = require('readline'),
    stream = require('stream');

var instream = fs.createReadStream('/path/to/file');
var outstream = new stream;
outstream.readable = true;
outstream.writable = true;

var rl = readline.createInterface({
    input: instream,
    output: outstream,
    terminal: false
});

rl.on('line', function(line) {
    console.log(line);
    //Do your stuff ...
    //Then write to outstream
    rl.write(cubestuff);
});

Los archivos grandes tardarán algún tiempo en procesarse. Dime si funciona.

usuario568109
fuente
2
Como está escrito, la penúltima línea falla porque cubestuff no está definido.
Greg
2
Usando readline, ¿es posible pausar / reanudar el flujo de lectura para realizar acciones asíncronas en el área "hacer cosas"?
jchook
1
@jchook readlineme estaba dando muchos problemas cuando intenté pausar / reanudar. No pausa la transmisión correctamente, lo que crea muchos problemas si el proceso descendente es más lento
Mandeep Singh
31

Realmente me gustó la respuesta de @gerard, que en realidad merece ser la respuesta correcta aquí. Hice algunas mejoras:

  • El código está en una clase (modular)
  • El análisis está incluido
  • La capacidad de reanudar se da al exterior en caso de que haya un trabajo asincrónico que esté encadenado a la lectura del CSV, como insertarlo en la base de datos, o una solicitud HTTP
  • Lectura en trozos / tamaños de lotes que el usuario puede declarar. También me encargué de la codificación en la transmisión, en caso de que tenga archivos con una codificación diferente.

Aquí está el código:

'use strict'

const fs = require('fs'),
    util = require('util'),
    stream = require('stream'),
    es = require('event-stream'),
    parse = require("csv-parse"),
    iconv = require('iconv-lite');

class CSVReader {
  constructor(filename, batchSize, columns) {
    this.reader = fs.createReadStream(filename).pipe(iconv.decodeStream('utf8'))
    this.batchSize = batchSize || 1000
    this.lineNumber = 0
    this.data = []
    this.parseOptions = {delimiter: '\t', columns: true, escape: '/', relax: true}
  }

  read(callback) {
    this.reader
      .pipe(es.split())
      .pipe(es.mapSync(line => {
        ++this.lineNumber

        parse(line, this.parseOptions, (err, d) => {
          this.data.push(d[0])
        })

        if (this.lineNumber % this.batchSize === 0) {
          callback(this.data)
        }
      })
      .on('error', function(){
          console.log('Error while reading file.')
      })
      .on('end', function(){
          console.log('Read entirefile.')
      }))
  }

  continue () {
    this.data = []
    this.reader.resume()
  }
}

module.exports = CSVReader

Básicamente, así es como lo usará:

let reader = CSVReader('path_to_file.csv')
reader.read(() => reader.continue())

Probé esto con un archivo CSV de 35GB y funcionó para mí y es por eso que elegí construirlo en la respuesta de @gerard , los comentarios son bienvenidos.

ambodi
fuente
cuanto tiempo tomo
Z. Khullah
Aparentemente, esto carece de pause()llamada, ¿no?
Vanuan
Además, esto no llama a la función de devolución de llamada al final. Entonces, si batchSize es 100, el tamaño de los archivos es 150, solo se procesarán 100 elementos. ¿Me equivoco?
Vanuan
16

Usé https://www.npmjs.com/package/line-by-line para leer más de 1 000 000 de líneas de un archivo de texto. En este caso, la capacidad de RAM ocupada era de 50 a 60 megabytes.

    const LineByLineReader = require('line-by-line'),
    lr = new LineByLineReader('big_file.txt');

    lr.on('error', function (err) {
         // 'err' contains error object
    });

    lr.on('line', function (line) {
        // pause emitting of lines...
        lr.pause();

        // ...do your asynchronous line processing..
        setTimeout(function () {
            // ...and continue emitting lines.
            lr.resume();
        }, 100);
    });

    lr.on('end', function () {
         // All lines are read, file is closed now.
    });
Eugene Ilyushin
fuente
'línea por línea' es más eficiente en memoria que la respuesta seleccionada. Para 1 millón de líneas en un csv, la respuesta seleccionada tenía mi proceso de nodo en los 800 megabytes bajos. Usando 'línea por línea', fue consistentemente en los 700 bajos. Este módulo también mantiene el código limpio y fácil de leer. En total, tendré que leer alrededor de 18 millones, ¡así que cada mb cuenta!
Neo
Es una pena que esto use su propia 'línea' de eventos en lugar del 'trozo' estándar, lo que significa que no podrá usar 'tubería'.
Rene Wooller
Después de horas de pruebas y búsquedas, esta es la única solución que realmente se detiene en el lr.cancel()método. Lee las primeras 1000 líneas de un archivo de 5 Gig en 1 ms. ¡¡¡¡Increíble!!!!
Perez Lamed van Niekerk
6

Además de leer el archivo grande línea por línea, también puede leerlo fragmento a fragmento. Para obtener más información, consulte este artículo.

var offset = 0;
var chunkSize = 2048;
var chunkBuffer = new Buffer(chunkSize);
var fp = fs.openSync('filepath', 'r');
var bytesRead = 0;
while(bytesRead = fs.readSync(fp, chunkBuffer, 0, chunkSize, offset)) {
    offset += bytesRead;
    var str = chunkBuffer.slice(0, bytesRead).toString();
    var arr = str.split('\n');

    if(bytesRead = chunkSize) {
        // the last item of the arr may be not a full line, leave it to the next chunk
        offset -= arr.pop().length;
    }
    lines.push(arr);
}
console.log(lines);
Kris Roofe
fuente
¿Podría ser que lo siguiente debería ser una comparación en lugar de una asignación if(bytesRead = chunkSize):?
Stefan Rein
4

La documentación de Node.js ofrece un ejemplo muy elegante utilizando el módulo Readline.

Ejemplo: lectura del flujo de archivos línea por línea

const fs = require('fs');
const readline = require('readline');

const rl = readline.createInterface({
    input: fs.createReadStream('sample.txt'),
    crlfDelay: Infinity
});

rl.on('line', (line) => {
    console.log(`Line from file: ${line}`);
});

Nota: usamos la opción crlfDelay para reconocer todas las instancias de CR LF ('\ r \ n') como un solo salto de línea.

Jaime Gómez
fuente
3

Tuve el mismo problema todavía. Después de comparar varios módulos que parecen tener esta característica, decidí hacerlo yo mismo, es más simple de lo que pensaba.

esencia: https://gist.github.com/deemstone/8279565

var fetchBlock = lineByline(filepath, onEnd);
fetchBlock(function(lines, start){ ... });  //lines{array} start{int} lines[0] No.

Cubre el archivo abierto en un cierre, que fetchBlock()devuelto buscará un bloque del archivo, finaliza la división en la matriz (se ocupará del segmento de la última búsqueda).

Establecí el tamaño del bloque en 1024 para cada operación de lectura. Esto puede tener errores, pero la lógica del código es obvia, pruébelo usted mismo.

Demstone
fuente
2

Nodo por línea utiliza secuencias, por lo que preferiría esa para sus archivos enormes.

para sus conversiones de fecha, usaría moment.js .

para maximizar su rendimiento, podría pensar en utilizar un clúster de software. hay algunos módulos agradables que encajan bastante bien con el módulo de clúster nativo del nodo. me gusta cluster-master de isaacs. por ejemplo, podría crear un grupo de x trabajadores que calculan un archivo.

para comparar divisiones frente a expresiones regulares, use benchmark.js . No lo he probado hasta ahora. benchmark.js está disponible como módulo de nodo

aquí y ahora78
fuente
2

Basándome en esta respuesta a las preguntas, implementé una clase que puede usar para leer un archivo sincrónicamente línea por línea fs.readSync(). Puede hacer esta "pausa" y "reanudar" mediante una Qpromesa ( jQueryparece requerir un DOM, por lo que no puede ejecutarlo nodejs):

var fs = require('fs');
var Q = require('q');

var lr = new LineReader(filenameToLoad);
lr.open();

var promise;
workOnLine = function () {
    var line = lr.readNextLine();
    promise = complexLineTransformation(line).then(
        function() {console.log('ok');workOnLine();},
        function() {console.log('error');}
    );
}
workOnLine();

complexLineTransformation = function (line) {
    var deferred = Q.defer();
    // ... async call goes here, in callback: deferred.resolve('done ok'); or deferred.reject(new Error(error));
    return deferred.promise;
}

function LineReader (filename) {      
  this.moreLinesAvailable = true;
  this.fd = undefined;
  this.bufferSize = 1024*1024;
  this.buffer = new Buffer(this.bufferSize);
  this.leftOver = '';

  this.read = undefined;
  this.idxStart = undefined;
  this.idx = undefined;

  this.lineNumber = 0;

  this._bundleOfLines = [];

  this.open = function() {
    this.fd = fs.openSync(filename, 'r');
  };

  this.readNextLine = function () {
    if (this._bundleOfLines.length === 0) {
      this._readNextBundleOfLines();
    }
    this.lineNumber++;
    var lineToReturn = this._bundleOfLines[0];
    this._bundleOfLines.splice(0, 1); // remove first element (pos, howmany)
    return lineToReturn;
  };

  this.getLineNumber = function() {
    return this.lineNumber;
  };

  this._readNextBundleOfLines = function() {
    var line = "";
    while ((this.read = fs.readSync(this.fd, this.buffer, 0, this.bufferSize, null)) !== 0) { // read next bytes until end of file
      this.leftOver += this.buffer.toString('utf8', 0, this.read); // append to leftOver
      this.idxStart = 0
      while ((this.idx = this.leftOver.indexOf("\n", this.idxStart)) !== -1) { // as long as there is a newline-char in leftOver
        line = this.leftOver.substring(this.idxStart, this.idx);
        this._bundleOfLines.push(line);        
        this.idxStart = this.idx + 1;
      }
      this.leftOver = this.leftOver.substring(this.idxStart);
      if (line !== "") {
        break;
      }
    }
  }; 
}
Benvorth
fuente
0
import * as csv from 'fast-csv';
import * as fs from 'fs';
interface Row {
  [s: string]: string;
}
type RowCallBack = (data: Row, index: number) => object;
export class CSVReader {
  protected file: string;
  protected csvOptions = {
    delimiter: ',',
    headers: true,
    ignoreEmpty: true,
    trim: true
  };
  constructor(file: string, csvOptions = {}) {
    if (!fs.existsSync(file)) {
      throw new Error(`File ${file} not found.`);
    }
    this.file = file;
    this.csvOptions = Object.assign({}, this.csvOptions, csvOptions);
  }
  public read(callback: RowCallBack): Promise < Array < object >> {
    return new Promise < Array < object >> (resolve => {
      const readStream = fs.createReadStream(this.file);
      const results: Array < any > = [];
      let index = 0;
      const csvStream = csv.parse(this.csvOptions).on('data', async (data: Row) => {
        index++;
        results.push(await callback(data, index));
      }).on('error', (err: Error) => {
        console.error(err.message);
        throw err;
      }).on('end', () => {
        resolve(results);
      });
      readStream.pipe(csvStream);
    });
  }
}
import { CSVReader } from '../src/helpers/CSVReader';
(async () => {
  const reader = new CSVReader('./database/migrations/csv/users.csv');
  const users = await reader.read(async data => {
    return {
      username: data.username,
      name: data.name,
      email: data.email,
      cellPhone: data.cell_phone,
      homePhone: data.home_phone,
      roleId: data.role_id,
      description: data.description,
      state: data.state,
    };
  });
  console.log(users);
})();
Raza
fuente
-1

He creado un módulo de nodo para leer archivos grandes de forma asincrónica, texto o JSON. Probado en archivos grandes.

var fs = require('fs')
, util = require('util')
, stream = require('stream')
, es = require('event-stream');

module.exports = FileReader;

function FileReader(){

}

FileReader.prototype.read = function(pathToFile, callback){
    var returnTxt = '';
    var s = fs.createReadStream(pathToFile)
    .pipe(es.split())
    .pipe(es.mapSync(function(line){

        // pause the readstream
        s.pause();

        //console.log('reading line: '+line);
        returnTxt += line;        

        // resume the readstream, possibly from a callback
        s.resume();
    })
    .on('error', function(){
        console.log('Error while reading file.');
    })
    .on('end', function(){
        console.log('Read entire file.');
        callback(returnTxt);
    })
);
};

FileReader.prototype.readJSON = function(pathToFile, callback){
    try{
        this.read(pathToFile, function(txt){callback(JSON.parse(txt));});
    }
    catch(err){
        throw new Error('json file is not valid! '+err.stack);
    }
};

Simplemente guarde el archivo como file-reader.js y utilícelo así:

var FileReader = require('./file-reader');
var fileReader = new FileReader();
fileReader.readJSON(__dirname + '/largeFile.json', function(jsonObj){/*callback logic here*/});
Eyal Zoref
fuente
7
Parece que copiaste de la respuesta de Gerard. Debes darle crédito a Gerard por la parte que copiaste.
Paul Lynch