Canalice una transmisión a s3.upload ()

89

Actualmente estoy haciendo uso de un complemento de node.js llamado s3-upload-stream para transmitir archivos muy grandes a Amazon S3. Utiliza la API de varias partes y, en su mayor parte, funciona muy bien.

Sin embargo, este módulo muestra su antigüedad y ya tuve que hacerle modificaciones (el autor también lo ha desaprobado). Hoy me encontré con otro problema con Amazon, y realmente me gustaría tomar la recomendación del autor y comenzar a usar el aws-sdk oficial para realizar mis cargas.

PERO.

El SDK oficial no parece admitir la conexión a s3.upload(). La naturaleza de s3.upload es que debe pasar la secuencia legible como argumento al constructor de S3.

Tengo aproximadamente más de 120 módulos de código de usuario que procesan varios archivos y son independientes del destino final de su salida. El motor les entrega un flujo de salida escribible y canalizable, y lo canalizan. No puedo entregarles un AWS.S3objeto y pedirles que lo llamen upload()sin agregar código a todos los módulos. La razón por la que utilicé s3-upload-streamfue porque soportaba tuberías.

¿Hay alguna manera de hacer que aws-sdk sea s3.upload()algo a lo que pueda canalizar la transmisión?

womp
fuente

Respuestas:

132

Envuelva la upload()función S3 con la stream.PassThrough()secuencia node.js.

He aquí un ejemplo:

inputStream
  .pipe(uploadFromStream(s3));

function uploadFromStream(s3) {
  var pass = new stream.PassThrough();

  var params = {Bucket: BUCKET, Key: KEY, Body: pass};
  s3.upload(params, function(err, data) {
    console.log(err, data);
  });

  return pass;
}
Casey Benko
fuente
2
Genial, esto resolvió mi truco muy feo = -) ¿Puedes explicar qué hace realmente el stream.PassThrough ()?
mraxus
6
¿Se cierra su flujo PassThrough cuando hace esto? Me lo estoy pasando en grande propiciando el cierre en s3.upload para golpear mi transmisión PassThrough.
four43
7
el tamaño del archivo cargado es de 0 bytes. Si canalizo los mismos datos desde el flujo de origen al sistema de archivos, todo funciona bien. ¿Alguna idea?
Radar155
3
Un flujo de paso a través tomará los bytes escritos y los generará. Esto le permite devolver un flujo de escritura que aws-sdk leerá mientras escribe en él. También devolvería el objeto de respuesta de s3.upload () porque de lo contrario no puede asegurarse de que se complete la carga.
reconbot
1
¿De dónde vienen los s3parámetros dentro de la tubería stream?
Blackjack
94

Una respuesta un poco tardía, podría ayudar a alguien más, con suerte. Puede devolver tanto el flujo de escritura como la promesa, de modo que pueda obtener datos de respuesta cuando finalice la carga.

const AWS = require('aws-sdk');
const stream = require('stream');

const uploadStream = ({ Bucket, Key }) => {
  const s3 = new AWS.S3();
  const pass = new stream.PassThrough();
  return {
    writeStream: pass,
    promise: s3.upload({ Bucket, Key, Body: pass }).promise(),
  };
}

Y puede usar la función de la siguiente manera:

const { writeStream, promise } = uploadStream({Bucket: 'yourbucket', Key: 'yourfile.mp4'});
const readStream = fs.createReadStream('/path/to/yourfile.mp4');

const pipeline = readStream.pipe(writeStream);

Ahora puede verificar la promesa:

promise.then(() => {
  console.log('upload completed successfully');
}).catch((err) => {
  console.log('upload failed.', err.message);
});

O como stream.pipe()retorna stream.Writable, el destino (variable writeStream arriba), permitiendo una cadena de tuberías, también podemos usar sus eventos:

 pipeline.on('close', () => {
   console.log('upload successful');
 });
 pipeline.on('error', (err) => {
   console.log('upload failed', err.message)
 });
Ahmet Cetin
fuente
Se ve muy bien, pero de mi lado recibo este error stackoverflow.com/questions/62330721/…
Arco Voltaico
acaba de responder a su pregunta. Espero eso ayude.
Ahmet Cetin
48

En la respuesta aceptada, la función finaliza antes de que se complete la carga y, por lo tanto, es incorrecta. El siguiente código se canaliza correctamente desde una secuencia legible.

Cargar referencia

async function uploadReadableStream(stream) {
  const params = {Bucket: bucket, Key: key, Body: stream};
  return s3.upload(params).promise();
}

async function upload() {
  const readable = getSomeReadableStream();
  const results = await uploadReadableStream(readable);
  console.log('upload complete', results);
}

También puede ir un paso más allá y generar información de progreso usando ManagedUploadcomo tal:

const manager = s3.upload(params);
manager.on('httpUploadProgress', (progress) => {
  console.log('progress', progress) // { loaded: 4915, total: 192915, part: 1, key: 'foo.jpg' }
});

Referencia de ManagedUpload

Una lista de eventos disponibles

tsuz
fuente
1
aws-sdk ahora ofrece promesas integradas en 2.3.0+, por lo que ya no tiene que levantarlas. s3.upload (params) .promise (). then (data => data) .catch (error => error);
DBrown
1
@DBrown ¡Gracias por el puntero! Actualicé la respuesta, en consecuencia.
tsuz
1
@tsuz, al intentar implementar su solución me da un error: ¿ TypeError: dest.on is not a functionalguna idea de por qué?
FireBrand
¿Qué es dest.on? ¿Puedes mostrar un ejemplo? @FireBrand
tsuz
9
Esto dice que la respuesta aceptada está incompleta pero no funciona con la canalización a s3.upload como se indica en la publicación actualizada de @ Womp. ¡Sería muy útil si esta respuesta se actualizara para tomar la salida canalizada de otra cosa!
MattW
6

Ninguna de las respuestas me funcionó porque quería:

  • Tubo en s3.upload()
  • Canalice el resultado de s3.upload()a otra corriente

La respuesta aceptada no hace lo último. Los demás se basan en la API de promesa, que es engorrosa de trabajar cuando se trabaja con tuberías de flujo.

Esta es mi modificación de la respuesta aceptada.

const s3 = new S3();

function writeToS3({Key, Bucket}) {
  const Body = new stream.PassThrough();

  s3.upload({
    Body,
    Key,
    Bucket: process.env.adpBucket
  })
   .on('httpUploadProgress', progress => {
       console.log('progress', progress);
   })
   .send((err, data) => {
     if (err) {
       Body.destroy(err);
     } else {
       console.log(`File uploaded and available at ${data.Location}`);
       Body.destroy();
     }
  });

  return Body;
}

const pipeline = myReadableStream.pipe(writeToS3({Key, Bucket});

pipeline.on('close', () => {
  // upload finished, do something else
})
pipeline.on('error', () => {
  // upload wasn't successful. Handle it
})

cortopy
fuente
Se ve muy bien, pero por mi parte recibo este error stackoverflow.com/questions/62330721/…
Arco Voltaico
5

Solución de tipo script:
este ejemplo utiliza:

import * as AWS from "aws-sdk";
import * as fsExtra from "fs-extra";
import * as zlib from "zlib";
import * as stream from "stream";

Y función asincrónica:

public async saveFile(filePath: string, s3Bucket: AWS.S3, key: string, bucketName: string): Promise<boolean> { 

         const uploadStream = (S3: AWS.S3, Bucket: string, Key: string) => {
            const passT = new stream.PassThrough();
            return {
              writeStream: passT,
              promise: S3.upload({ Bucket, Key, Body: passT }).promise(),
            };
          };
        const { writeStream, promise } = uploadStream(s3Bucket, bucketName, key);
        fsExtra.createReadStream(filePath).pipe(writeStream);     //  NOTE: Addition You can compress to zip by  .pipe(zlib.createGzip()).pipe(writeStream)
        let output = true;
        await promise.catch((reason)=> { output = false; console.log(reason);});
        return output;
}

Llame a este método en algún lugar como:

let result = await saveFileToS3(testFilePath, someS3Bucket, someKey, someBucketName);
dzole vladimirov
fuente
4

Lo que hay que tener en cuenta en la respuesta más aceptada anterior es que: debe devolver el pase en la función si está usando pipe like,

fs.createReadStream(<filePath>).pipe(anyUploadFunction())

function anyUploadFunction () { 
 let pass = new stream.PassThrough();
 return pass // <- Returning this pass is important for the stream to understand where it needs to write to.
}

De lo contrario, pasará silenciosamente a la siguiente sin arrojar un error o arrojará un error TypeError: dest.on is not a functiondependiendo de cómo haya escrito la función

varun bhaya
fuente
3

Si ayuda a alguien, pude transmitir desde el cliente a s3 con éxito:

https://gist.github.com/mattlockyer/532291b6194f6d9ca40cb82564db9d2a

El código del lado del servidor asume que reqes un objeto de flujo, en mi caso fue enviado desde el cliente con la información del archivo configurada en los encabezados.

const fileUploadStream = (req, res) => {
  //get "body" args from header
  const { id, fn } = JSON.parse(req.get('body'));
  const Key = id + '/' + fn; //upload to s3 folder "id" with filename === fn
  const params = {
    Key,
    Bucket: bucketName, //set somewhere
    Body: req, //req is a stream
  };
  s3.upload(params, (err, data) => {
    if (err) {
      res.send('Error Uploading Data: ' + JSON.stringify(err) + '\n' + JSON.stringify(err.stack));
    } else {
      res.send(Key);
    }
  });
};

Sí, rompe las convenciones, pero si miras la esencia, es mucho más limpio que cualquier otra cosa que encontré usando multer, ayudante de camarero, etc.

+1 por pragmatismo y gracias a @SalehenRahman por su ayuda.

mattdlockyer
fuente
multer, ayudante de camarero maneja cargas de datos de formularios / múltiples partes. req as a stream funciona cuando el cliente envía un búfer como cuerpo desde XMLHttpRequest.
André Werlang
Para aclarar, la carga se realiza desde el back-end, no el cliente, ¿verdad?
numX
Sí, está "canalizando" la transmisión, en el backend, pero proviene de un frontend
mattdlockyer
3

Para aquellos que se quejan de que cuando usan la función de carga de la API de s3 y un archivo de cero bytes termina en s3 (@ Radar155 y @gabo), también tuve este problema.

Cree un segundo flujo PassThrough y simplemente canalice todos los datos del primero al segundo y pase la referencia a ese segundo a s3. Puede hacer esto de un par de formas diferentes; posiblemente una forma sucia es escuchar el evento "datos" en la primera secuencia y luego escribir esos mismos datos en la segunda secuencia, de manera similar para el evento "final", simplemente llame la función final en la segunda secuencia. No tengo idea de si se trata de un error en la API de AWS, la versión del nodo o algún otro problema, pero me solucionó el problema.

Así es como podría verse:

var PassThroughStream = require('stream').PassThrough;
var srcStream = new PassThroughStream();

var rstream = fs.createReadStream('Learning/stocktest.json');
var sameStream = rstream.pipe(srcStream);
// interesting note: (srcStream == sameStream) at this point
var destStream = new PassThroughStream();
// call your s3.upload function here - passing in the destStream as the Body parameter
srcStream.on('data', function (chunk) {
    destStream.write(chunk);
});

srcStream.on('end', function () {
    dataStream.end();
});
Tim
fuente
Esto realmente funcionó para mí también. La función de carga de S3 simplemente "murió" silenciosamente cada vez que se usaba una carga de varias partes, pero cuando usaba su solución, funcionaba bien (!). ¡Gracias! :)
jhdrn
¿Puede darnos información sobre por qué se necesita la segunda transmisión?
noob7
1

Siguiendo las otras respuestas y usando el último AWS SDK para Node.js, hay una solución mucho más limpia y simple, ya que la función upload () de s3 acepta una transmisión, usando la sintaxis de espera y la promesa de S3:

var model = await s3Client.upload({
    Bucket : bucket,
    Key : key,
    ContentType : yourContentType,
    Body : fs.createReadStream(path-to-file)
}).promise();
emich
fuente
0

Estoy usando KnexJS y tuve un problema al usar su API de transmisión. Finalmente lo arreglé, espero que lo siguiente ayude a alguien.

const knexStream = knex.select('*').from('my_table').stream();
const passThroughStream = new stream.PassThrough();

knexStream.on('data', (chunk) => passThroughStream.write(JSON.stringify(chunk) + '\n'));
knexStream.on('end', () => passThroughStream.end());

const uploadResult = await s3
  .upload({
    Bucket: 'my-bucket',
    Key: 'stream-test.txt',
    Body: passThroughStream
  })
  .promise();
TestWell
fuente
-3

Si conoce el tamaño de la transmisión, puede usar minio-js para cargar la transmisión de esta manera:

  s3Client.putObject('my-bucketname', 'my-objectname.ogg', stream, size, 'audio/ogg', function(e) {
    if (e) {
      return console.log(e)
    }
    console.log("Successfully uploaded the stream")
  })
Krishna Srinivas
fuente