Actualmente estoy haciendo uso de un complemento de node.js llamado s3-upload-stream para transmitir archivos muy grandes a Amazon S3. Utiliza la API de varias partes y, en su mayor parte, funciona muy bien.
Sin embargo, este módulo muestra su antigüedad y ya tuve que hacerle modificaciones (el autor también lo ha desaprobado). Hoy me encontré con otro problema con Amazon, y realmente me gustaría tomar la recomendación del autor y comenzar a usar el aws-sdk oficial para realizar mis cargas.
PERO.
El SDK oficial no parece admitir la conexión a s3.upload()
. La naturaleza de s3.upload es que debe pasar la secuencia legible como argumento al constructor de S3.
Tengo aproximadamente más de 120 módulos de código de usuario que procesan varios archivos y son independientes del destino final de su salida. El motor les entrega un flujo de salida escribible y canalizable, y lo canalizan. No puedo entregarles un AWS.S3
objeto y pedirles que lo llamen upload()
sin agregar código a todos los módulos. La razón por la que utilicé s3-upload-stream
fue porque soportaba tuberías.
¿Hay alguna manera de hacer que aws-sdk sea s3.upload()
algo a lo que pueda canalizar la transmisión?
s3
parámetros dentro de la tuberíastream
?Una respuesta un poco tardía, podría ayudar a alguien más, con suerte. Puede devolver tanto el flujo de escritura como la promesa, de modo que pueda obtener datos de respuesta cuando finalice la carga.
const AWS = require('aws-sdk'); const stream = require('stream'); const uploadStream = ({ Bucket, Key }) => { const s3 = new AWS.S3(); const pass = new stream.PassThrough(); return { writeStream: pass, promise: s3.upload({ Bucket, Key, Body: pass }).promise(), }; }
Y puede usar la función de la siguiente manera:
const { writeStream, promise } = uploadStream({Bucket: 'yourbucket', Key: 'yourfile.mp4'}); const readStream = fs.createReadStream('/path/to/yourfile.mp4'); const pipeline = readStream.pipe(writeStream);
Ahora puede verificar la promesa:
promise.then(() => { console.log('upload completed successfully'); }).catch((err) => { console.log('upload failed.', err.message); });
O como
stream.pipe()
retorna stream.Writable, el destino (variable writeStream arriba), permitiendo una cadena de tuberías, también podemos usar sus eventos:pipeline.on('close', () => { console.log('upload successful'); }); pipeline.on('error', (err) => { console.log('upload failed', err.message) });
fuente
En la respuesta aceptada, la función finaliza antes de que se complete la carga y, por lo tanto, es incorrecta. El siguiente código se canaliza correctamente desde una secuencia legible.
Cargar referencia
async function uploadReadableStream(stream) { const params = {Bucket: bucket, Key: key, Body: stream}; return s3.upload(params).promise(); } async function upload() { const readable = getSomeReadableStream(); const results = await uploadReadableStream(readable); console.log('upload complete', results); }
También puede ir un paso más allá y generar información de progreso usando
ManagedUpload
como tal:const manager = s3.upload(params); manager.on('httpUploadProgress', (progress) => { console.log('progress', progress) // { loaded: 4915, total: 192915, part: 1, key: 'foo.jpg' } });
Referencia de ManagedUpload
Una lista de eventos disponibles
fuente
TypeError: dest.on is not a function
alguna idea de por qué?dest.on
? ¿Puedes mostrar un ejemplo? @FireBrandNinguna de las respuestas me funcionó porque quería:
s3.upload()
s3.upload()
a otra corrienteLa respuesta aceptada no hace lo último. Los demás se basan en la API de promesa, que es engorrosa de trabajar cuando se trabaja con tuberías de flujo.
Esta es mi modificación de la respuesta aceptada.
const s3 = new S3(); function writeToS3({Key, Bucket}) { const Body = new stream.PassThrough(); s3.upload({ Body, Key, Bucket: process.env.adpBucket }) .on('httpUploadProgress', progress => { console.log('progress', progress); }) .send((err, data) => { if (err) { Body.destroy(err); } else { console.log(`File uploaded and available at ${data.Location}`); Body.destroy(); } }); return Body; } const pipeline = myReadableStream.pipe(writeToS3({Key, Bucket}); pipeline.on('close', () => { // upload finished, do something else }) pipeline.on('error', () => { // upload wasn't successful. Handle it })
fuente
Solución de tipo script:
este ejemplo utiliza:
import * as AWS from "aws-sdk"; import * as fsExtra from "fs-extra"; import * as zlib from "zlib"; import * as stream from "stream";
Y función asincrónica:
public async saveFile(filePath: string, s3Bucket: AWS.S3, key: string, bucketName: string): Promise<boolean> { const uploadStream = (S3: AWS.S3, Bucket: string, Key: string) => { const passT = new stream.PassThrough(); return { writeStream: passT, promise: S3.upload({ Bucket, Key, Body: passT }).promise(), }; }; const { writeStream, promise } = uploadStream(s3Bucket, bucketName, key); fsExtra.createReadStream(filePath).pipe(writeStream); // NOTE: Addition You can compress to zip by .pipe(zlib.createGzip()).pipe(writeStream) let output = true; await promise.catch((reason)=> { output = false; console.log(reason);}); return output; }
Llame a este método en algún lugar como:
let result = await saveFileToS3(testFilePath, someS3Bucket, someKey, someBucketName);
fuente
Lo que hay que tener en cuenta en la respuesta más aceptada anterior es que: debe devolver el pase en la función si está usando pipe like,
fs.createReadStream(<filePath>).pipe(anyUploadFunction())
function anyUploadFunction () { let pass = new stream.PassThrough(); return pass // <- Returning this pass is important for the stream to understand where it needs to write to. }
De lo contrario, pasará silenciosamente a la siguiente sin arrojar un error o arrojará un error
TypeError: dest.on is not a function
dependiendo de cómo haya escrito la funciónfuente
Si ayuda a alguien, pude transmitir desde el cliente a s3 con éxito:
https://gist.github.com/mattlockyer/532291b6194f6d9ca40cb82564db9d2a
El código del lado del servidor asume que
req
es un objeto de flujo, en mi caso fue enviado desde el cliente con la información del archivo configurada en los encabezados.const fileUploadStream = (req, res) => { //get "body" args from header const { id, fn } = JSON.parse(req.get('body')); const Key = id + '/' + fn; //upload to s3 folder "id" with filename === fn const params = { Key, Bucket: bucketName, //set somewhere Body: req, //req is a stream }; s3.upload(params, (err, data) => { if (err) { res.send('Error Uploading Data: ' + JSON.stringify(err) + '\n' + JSON.stringify(err.stack)); } else { res.send(Key); } }); };
Sí, rompe las convenciones, pero si miras la esencia, es mucho más limpio que cualquier otra cosa que encontré usando multer, ayudante de camarero, etc.
+1 por pragmatismo y gracias a @SalehenRahman por su ayuda.
fuente
Para aquellos que se quejan de que cuando usan la función de carga de la API de s3 y un archivo de cero bytes termina en s3 (@ Radar155 y @gabo), también tuve este problema.
Cree un segundo flujo PassThrough y simplemente canalice todos los datos del primero al segundo y pase la referencia a ese segundo a s3. Puede hacer esto de un par de formas diferentes; posiblemente una forma sucia es escuchar el evento "datos" en la primera secuencia y luego escribir esos mismos datos en la segunda secuencia, de manera similar para el evento "final", simplemente llame la función final en la segunda secuencia. No tengo idea de si se trata de un error en la API de AWS, la versión del nodo o algún otro problema, pero me solucionó el problema.
Así es como podría verse:
var PassThroughStream = require('stream').PassThrough; var srcStream = new PassThroughStream(); var rstream = fs.createReadStream('Learning/stocktest.json'); var sameStream = rstream.pipe(srcStream); // interesting note: (srcStream == sameStream) at this point var destStream = new PassThroughStream(); // call your s3.upload function here - passing in the destStream as the Body parameter srcStream.on('data', function (chunk) { destStream.write(chunk); }); srcStream.on('end', function () { dataStream.end(); });
fuente
Siguiendo las otras respuestas y usando el último AWS SDK para Node.js, hay una solución mucho más limpia y simple, ya que la función upload () de s3 acepta una transmisión, usando la sintaxis de espera y la promesa de S3:
var model = await s3Client.upload({ Bucket : bucket, Key : key, ContentType : yourContentType, Body : fs.createReadStream(path-to-file) }).promise();
fuente
Estoy usando KnexJS y tuve un problema al usar su API de transmisión. Finalmente lo arreglé, espero que lo siguiente ayude a alguien.
const knexStream = knex.select('*').from('my_table').stream(); const passThroughStream = new stream.PassThrough(); knexStream.on('data', (chunk) => passThroughStream.write(JSON.stringify(chunk) + '\n')); knexStream.on('end', () => passThroughStream.end()); const uploadResult = await s3 .upload({ Bucket: 'my-bucket', Key: 'stream-test.txt', Body: passThroughStream }) .promise();
fuente
Si conoce el tamaño de la transmisión, puede usar minio-js para cargar la transmisión de esta manera:
s3Client.putObject('my-bucketname', 'my-objectname.ogg', stream, size, 'audio/ogg', function(e) { if (e) { return console.log(e) } console.log("Successfully uploaded the stream") })
fuente