Tengo un código que está iterando sobre una lista que fue consultada desde una base de datos y haciendo una solicitud HTTP para cada elemento en esa lista. Esa lista a veces puede ser un número razonablemente grande (en miles), y me gustaría asegurarme de que no estoy llegando a un servidor web con miles de solicitudes HTTP simultáneas.
Una versión abreviada de este código actualmente se parece a esto ...
function getCounts() {
return users.map(user => {
return new Promise(resolve => {
remoteServer.getCount(user) // makes an HTTP request
.then(() => {
/* snip */
resolve();
});
});
});
}
Promise.all(getCounts()).then(() => { /* snip */});
Este código se ejecuta en el nodo 4.3.2. Para reiterar, ¿se Promise.all
puede administrar de manera que solo un cierto número de Promesas estén en progreso en un momento dado?
javascript
node.js
es6-promise
Chris
fuente
fuente
Promise.all
sí gestiona la progresión de las promesas: las promesas lo hacen por sí mismas,Promise.all
solo las espera.Promise
antipatrón del constructor !Respuestas:
Tenga en cuenta que
Promise.all()
no activa las promesas para comenzar su trabajo, la creación de la promesa sí lo hace.Con eso en mente, una solución sería verificar cada vez que se resuelve una promesa si se debe iniciar una nueva promesa o si ya está en el límite.
Sin embargo, aquí no es necesario reinventar la rueda. Una biblioteca que puede utilizar para este propósito es
es6-promise-pool
. De sus ejemplos:// On the Web, leave out this line and use the script tag above instead. var PromisePool = require('es6-promise-pool') var promiseProducer = function () { // Your code goes here. // If there is work left to be done, return the next work item as a promise. // Otherwise, return null to indicate that all promises have been created. // Scroll down for an example. } // The number of promises to process simultaneously. var concurrency = 3 // Create a pool. var pool = new PromisePool(promiseProducer, concurrency) // Start the pool. var poolPromise = pool.start() // Wait for the pool to settle. poolPromise.then(function () { console.log('All promises fulfilled') }, function (error) { console.log('Some promise rejected: ' + error.message) })
fuente
Límite P
He comparado la limitación de concurrencia de promesas con un script personalizado, bluebird, es6-promise-pool y p-limit. Creo que p-limit tiene la implementación más simple y reducida para esta necesidad. Consulte su documentación .
Requisitos
Para ser compatible con async en el ejemplo
Mi ejemplo
En este ejemplo, necesitamos ejecutar una función para cada URL en la matriz (como, tal vez, una solicitud de API). Aquí esto se llama
fetchData()
. Si tuviéramos una matriz de miles de elementos para procesar, la simultaneidad definitivamente sería útil para ahorrar en recursos de CPU y memoria.const pLimit = require('p-limit'); // Example Concurrency of 3 promise at once const limit = pLimit(3); let urls = [ "http://www.exampleone.com/", "http://www.exampletwo.com/", "http://www.examplethree.com/", "http://www.examplefour.com/", ] // Create an array of our promises using map (fetchData() returns a promise) let promises = urls.map(url => { // wrap the function we are calling in the limit function we defined above return limit(() => fetchData(url)); }); (async () => { // Only three promises are run at once (as defined above) const result = await Promise.all(promises); console.log(result); })();
El resultado del registro de la consola es una matriz de los datos de respuesta a las promesas resueltas.
fuente
Utilizando
Array.prototype.splice
while (funcs.length) { // 100 at at time await Promise.all( funcs.splice(0, 100).map(f => f()) ) }
fuente
arr.splice(-100)
su lugar, si la orden no coincide, tal vez pueda revertir la matriz: PSi sabe cómo funcionan los iteradores y cómo se consumen, no necesitaría ninguna biblioteca adicional, ya que puede resultar muy fácil crear su propia simultaneidad. Déjame demostrarte:
/* [Symbol.iterator]() is equivalent to .values() const iterator = [1,2,3][Symbol.iterator]() */ const iterator = [1,2,3].values() // loop over all items with for..of for (const x of iterator) { console.log('x:', x) // notices how this loop continues the same iterator // and consumes the rest of the iterator, making the // outer loop not logging any more x's for (const y of iterator) { console.log('y:', y) } }
Podemos usar el mismo iterador y compartirlo con los trabajadores.
Si hubiera usado en
.entries()
lugar de.values()
, habría obtenido una matriz 2D con la[[index, value]]
que demostraré a continuación con una concurrencia de 2const sleep = t => new Promise(rs => setTimeout(rs, t)) async function doWork(iterator) { for (let [index, item] of iterator) { await sleep(1000) console.log(index + ': ' + item) } } const iterator = Array.from('abcdefghij').entries() const workers = new Array(2).fill(iterator).map(doWork) // ^--- starts two workers sharing the same iterator Promise.allSettled(workers).then(() => console.log('done'))
El beneficio de esto es que puede tener una función de generador en lugar de tener todo listo a la vez.
Nota: lo diferente de esto en comparación con el ejemplo async-pool es que genera dos trabajadores, por lo que si un trabajador arroja un error por alguna razón en el índice 5, no evitará que el otro trabajador haga el resto. Así que pasas de hacer 2 simultaneidad a 1. (para que no se detenga ahí) Entonces mi consejo es que detectes todos los errores dentro de la
doWork
funciónfuente
Promise.map de bluebird puede tomar una opción de concurrencia para controlar cuántas promesas deben ejecutarse en paralelo. A veces es más fácil que
.all
porque no necesita crear la matriz de promesa.const Promise = require('bluebird') function getCounts() { return Promise.map(users, user => { return new Promise(resolve => { remoteServer.getCount(user) // makes an HTTP request .then(() => { /* snip */ resolve(); }); }); }, {concurrency: 10}); // <---- at most 10 http requests at a time }
fuente
En lugar de usar promesas para limitar las solicitudes http, use http.Agent.maxSockets integrado en el nodo . Esto elimina el requisito de usar una biblioteca o escribir su propio código de agrupación y tiene la ventaja adicional de tener más control sobre lo que está limitando.
Por ejemplo:
var http = require('http'); var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin var request = http.request({..., agent: agent}, ...);
Si realiza varias solicitudes al mismo origen, también podría ser beneficioso establecerlo
keepAlive
en verdadero (consulte los documentos anteriores para obtener más información).fuente
Sugiero la biblioteca async-pool: https://github.com/rxaviers/async-pool
npm install tiny-async-pool
Descripción:
Uso:
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i)); await asyncPool(2, [1000, 5000, 3000, 2000], timeout); // Call iterator (i = 1000) // Call iterator (i = 5000) // Pool limit of 2 reached, wait for the quicker one to complete... // 1000 finishes // Call iterator (i = 3000) // Pool limit of 2 reached, wait for the quicker one to complete... // 3000 finishes // Call iterator (i = 2000) // Itaration is complete, wait until running ones complete... // 5000 finishes // 2000 finishes // Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.
fuente
Se puede resolver mediante recursividad.
La idea es que inicialmente envíe la cantidad máxima permitida de solicitudes y cada una de estas solicitudes debe continuar enviándose de forma recursiva una vez completada.
function batchFetch(urls, concurrentRequestsLimit) { return new Promise(resolve => { var documents = []; var index = 0; function recursiveFetch() { if (index === urls.length) { return; } fetch(urls[index++]).then(r => { documents.push(r.text()); if (documents.length === urls.length) { resolve(documents); } else { recursiveFetch(); } }); } for (var i = 0; i < concurrentRequestsLimit; i++) { recursiveFetch(); } }); } var sources = [ 'http://www.example_1.com/', 'http://www.example_2.com/', 'http://www.example_3.com/', ... 'http://www.example_100.com/' ]; batchFetch(sources, 5).then(documents => { console.log(documents); });
fuente
Aquí está mi solución ES7 para copiar y pegar y contar con una característica completa
Promise.all()
/map()
alternativa, con un límite de concurrencia.Similar a
Promise.all()
, mantiene el orden de devolución, así como un respaldo para los valores de devolución no prometidos.También incluí una comparación de las diferentes implementaciones, ya que ilustra algunos aspectos que algunas de las otras soluciones han pasado por alto.
Uso
const asyncFn = delay => new Promise(resolve => setTimeout(() => resolve(), delay)); const args = [30, 20, 15, 10]; await asyncPool(args, arg => asyncFn(arg), 4); // concurrency limit of 4
Implementación
async function asyncBatch(args, fn, limit = 8) { // Copy arguments to avoid side effects args = [...args]; const outs = []; while (args.length) { const batch = args.splice(0, limit); const out = await Promise.all(batch.map(fn)); outs.push(...out); } return outs; } async function asyncPool(args, fn, limit = 8) { return new Promise((resolve) => { // Copy arguments to avoid side effect, reverse queue as // pop is faster than shift const argQueue = [...args].reverse(); let count = 0; const outs = []; const pollNext = () => { if (argQueue.length === 0 && count === 0) { resolve(outs); } else { while (count < limit && argQueue.length) { const index = args.length - argQueue.length; const arg = argQueue.pop(); count += 1; const out = fn(arg); const processOut = (out, index) => { outs[index] = out; count -= 1; pollNext(); }; if (typeof out === 'object' && out.then) { out.then(out => processOut(out, index)); } else { processOut(out, index); } } } }; pollNext(); }); }
Comparación
// A simple async function that returns after the given delay // and prints its value to allow us to determine the response order const asyncFn = delay => new Promise(resolve => setTimeout(() => { console.log(delay); resolve(delay); }, delay)); // List of arguments to the asyncFn function const args = [30, 20, 15, 10]; // As a comparison of the different implementations, a low concurrency // limit of 2 is used in order to highlight the performance differences. // If a limit greater than or equal to args.length is used the results // would be identical. // Vanilla Promise.all/map combo const out1 = await Promise.all(args.map(arg => asyncFn(arg))); // prints: 10, 15, 20, 30 // total time: 30ms // Pooled implementation const out2 = await asyncPool(args, arg => asyncFn(arg), 2); // prints: 20, 30, 15, 10 // total time: 40ms // Batched implementation const out3 = await asyncBatch(args, arg => asyncFn(arg), 2); // prints: 20, 30, 20, 30 // total time: 45ms console.log(out1, out2, out3); // prints: [30, 20, 15, 10] x 3 // Conclusion: Execution order and performance is different, // but return order is still identical
Conclusión
asyncPool()
debería ser la mejor solución, ya que permite que se inicien nuevas solicitudes tan pronto como finalice una anterior.asyncBatch()
se incluye como una comparación, ya que su implementación es más simple de entender, pero debería tener un rendimiento más lento, ya que todas las solicitudes del mismo lote deben finalizar para iniciar el siguiente lote.En este ejemplo artificial, la vainilla no limitada
Promise.all()
es, por supuesto, la más rápida, mientras que las otras podrían tener un desempeño más deseable en un escenario de congestión del mundo real.Actualizar
La biblioteca async-pool que otros ya han sugerido es probablemente una mejor alternativa a mi implementación, ya que funciona casi de manera idéntica y tiene una implementación más concisa con un uso inteligente de Promise.race (): https://github.com/rxaviers/ async-pool / blob / master / lib / es7.js
Espero que mi respuesta pueda tener un valor educativo.
fuente
Aquí va un ejemplo básico para streaming y 'p-limit'. Transmite http read stream a mongo db.
const stream = require('stream'); const util = require('util'); const pLimit = require('p-limit'); const es = require('event-stream'); const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB; const pipeline = util.promisify(stream.pipeline) const outputDBConfig = { dbURL: 'yr-db-url', collection: 'some-collection' }; const limit = pLimit(3); async yrAsyncStreamingFunction(readStream) => { const mongoWriteStream = streamToMongoDB(outputDBConfig); const mapperStream = es.map((data, done) => { let someDataPromise = limit(() => yr_async_call_to_somewhere()) someDataPromise.then( function handleResolve(someData) { data.someData = someData; done(null, data); }, function handleError(error) { done(error) } ); }) await pipeline( readStream, JSONStream.parse('*'), mapperStream, mongoWriteStream ); }
fuente
Así que traté de hacer que algunos ejemplos que se muestran funcionen para mi código, pero como esto era solo para un script de importación y no para un código de producción, usar el paquete npm batch-promises seguramente fue la ruta más fácil para mí
NOTA: Requiere tiempo de ejecución para admitir Promise o para ser polietileno.
Api batchPromises (int: batchSize, array: Collection, i => Promise: Iteratee) La Promesa: Iteratee se llamará después de cada lote.
Utilizar:
batch-promises Easily batch promises NOTE: Requires runtime to support Promise or to be polyfilled. Api batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee) The Promise: Iteratee will be called after each batch. Use: import batchPromises from 'batch-promises'; batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => { // The iteratee will fire after each batch resulting in the following behaviour: // @ 100ms resolve items 1 and 2 (first batch of 2) // @ 200ms resolve items 3 and 4 (second batch of 2) // @ 300ms resolve remaining item 5 (last remaining batch) setTimeout(() => { resolve(i); }, 100); })) .then(results => { console.log(results); // [1,2,3,4,5] });
fuente
La recursividad es la respuesta si no desea utilizar bibliotecas externas
downloadAll(someArrayWithData){ var self = this; var tracker = function(next){ return self.someExpensiveRequest(someArrayWithData[next]) .then(function(){ next++;//This updates the next in the tracker function parameter if(next < someArrayWithData.length){//Did I finish processing all my data? return tracker(next);//Go to the next promise } }); } return tracker(0); }
fuente
Esto es lo que hice usando
Promise.race
, dentro de mi código aquíconst identifyTransactions = async function() { let promises = [] let concurrency = 0 for (let tx of this.transactions) { if (concurrency > 4) await Promise.race(promises).then(r => { promises = []; concurrency = 0 }) promises.push(tx.identifyTransaction()) concurrency++ } if (promises.length > 0) await Promise.race(promises) //resolve the rest }
Si quieres ver un ejemplo: https://jsfiddle.net/thecodermarcelo/av2tp83o/5/
fuente
Promise.race
Siempre que sea posible, intento desarrollar este tipo de cosas por mi cuenta, en lugar de buscar una biblioteca. Terminas aprendiendo muchos conceptos que antes parecían abrumadores.
¿Qué piensan ustedes de este intento?
(Lo pensé mucho y creo que está funcionando, pero señalen si no es así o si hay algo fundamentalmente mal)
class Pool{ constructor(maxAsync) { this.maxAsync = maxAsync; this.asyncOperationsQueue = []; this.currentAsyncOperations = 0 } runAnother() { if (this.asyncOperationsQueue.length > 0 && this.currentAsyncOperations < this.maxAsync) { this.currentAsyncOperations += 1; this.asyncOperationsQueue.pop()() .then(() => { this.currentAsyncOperations -= 1; this.runAnother() }, () => { this.currentAsyncOperations -= 1; this.runAnother() }) } } add(f){ // the argument f is a function of signature () => Promise this.runAnother(); return new Promise((resolve, reject) => { this.asyncOperationsQueue.push( () => f().then(resolve).catch(reject) ) }) } } //####################################################### // TESTS //####################################################### function dbCall(id, timeout, fail) { return new Promise((resolve, reject) => { setTimeout(() => { if (fail) { reject(`Error for id ${id}`); } else { resolve(id); } }, timeout) } ) } const dbQuery1 = () => dbCall(1, 5000, false); const dbQuery2 = () => dbCall(2, 5000, false); const dbQuery3 = () => dbCall(3, 5000, false); const dbQuery4 = () => dbCall(4, 5000, true); const dbQuery5 = () => dbCall(5, 5000, false); const cappedPool = new Pool(2); const dbQuery1Res = cappedPool.add(dbQuery1).catch(i => i).then(i => console.log(`Resolved: ${i}`)) const dbQuery2Res = cappedPool.add(dbQuery2).catch(i => i).then(i => console.log(`Resolved: ${i}`)) const dbQuery3Res = cappedPool.add(dbQuery3).catch(i => i).then(i => console.log(`Resolved: ${i}`)) const dbQuery4Res = cappedPool.add(dbQuery4).catch(i => i).then(i => console.log(`Resolved: ${i}`)) const dbQuery5Res = cappedPool.add(dbQuery5).catch(i => i).then(i => console.log(`Resolved: ${i}`))
Este enfoque proporciona una buena API, similar a los grupos de subprocesos en scala / java.
Después de crear una instancia del grupo con
const cappedPool = new Pool(2)
, le promete simplementecappedPool.add(() => myPromise)
.Sin darnos cuenta debemos asegurarnos de que la promesa no se inicie de inmediato y por eso debemos "brindarla perezosamente" con la ayuda de la función.
Lo más importante
add
es que tenga en cuenta que el resultado del método es una Promesa que se completará / resolverá con el valor de su promesa original . Esto lo convierte en un uso muy intuitivo.const resultPromise = cappedPool.add( () => dbCall(...)) resultPromise .then( actualResult => { // Do something with the result form the DB } )
fuente
Desafortunadamente, no hay forma de hacerlo con el Promise.all nativo, por lo que debes ser creativo.
Esta es la forma más rápida y concisa que pude encontrar sin usar bibliotecas externas.
Hace uso de una característica de JavaScript más nueva llamada iterador. Básicamente, el iterador realiza un seguimiento de los elementos que se han procesado y los que no.
Para usarlo en el código, crea una matriz de funciones asíncronas. Cada función asincrónica solicita al mismo iterador el siguiente elemento que debe procesarse. Cada función procesa su propio elemento de forma asincrónica y, cuando termina, solicita al iterador uno nuevo. Una vez que el iterador se queda sin elementos, todas las funciones se completan.
Gracias a @Endless por la inspiración.
var items = [ "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", "https://www.stackoverflow.com", ]; var concurrency = 5 Array(concurrency).fill(items.entries()).map(async (cursor) => { for(let [index, url] of cursor){ console.log("getting url is ", index, url); // run your async task instead of this next line var text = await fetch(url).then(res => res.text()); console.log("text is", text.slice(0,20)); } })
fuente
Tantas buenas soluciones. Comencé con la elegante solución publicada por @Endless y terminé con este pequeño método de extensión que no usa bibliotecas externas ni se ejecuta en lotes (aunque se supone que tiene características como async, etc.):
Promise.allWithLimit = async (taskList, limit = 5) => { const iterator = taskList.entries(); let results = new Array(taskList.length); let workerThreads = new Array(limit).fill(0).map(() => new Promise(async (resolve, reject) => { try { let entry = iterator.next(); while (!entry.done) { let [index, promise] = entry.value; try { results[index] = await promise; entry = iterator.next(); } catch (err) { results[index] = err; } } // No more work to do resolve(true); } catch (err) { // This worker is dead reject(err); } })); await Promise.all(workerThreads); return results; };
Promise.allWithLimit = async (taskList, limit = 5) => { const iterator = taskList.entries(); let results = new Array(taskList.length); let workerThreads = new Array(limit).fill(0).map(() => new Promise(async (resolve, reject) => { try { let entry = iterator.next(); while (!entry.done) { let [index, promise] = entry.value; try { results[index] = await promise; entry = iterator.next(); } catch (err) { results[index] = err; } } // No more work to do resolve(true); } catch (err) { // This worker is dead reject(err); } })); await Promise.all(workerThreads); return results; }; const demoTasks = new Array(10).fill(0).map((v,i) => new Promise(resolve => { let n = (i + 1) * 5; setTimeout(() => { console.log(`Did nothing for ${n} seconds`); resolve(n); }, n * 1000); })); var results = Promise.allWithLimit(demoTasks);
fuente