¿Cuál es la mejor manera de limitar la simultaneidad cuando se usa Promise.all () de ES6?

98

Tengo un código que está iterando sobre una lista que fue consultada desde una base de datos y haciendo una solicitud HTTP para cada elemento en esa lista. Esa lista a veces puede ser un número razonablemente grande (en miles), y me gustaría asegurarme de que no estoy llegando a un servidor web con miles de solicitudes HTTP simultáneas.

Una versión abreviada de este código actualmente se parece a esto ...

function getCounts() {
  return users.map(user => {
    return new Promise(resolve => {
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => {
        /* snip */
        resolve();
      });
    });
  });
}

Promise.all(getCounts()).then(() => { /* snip */});

Este código se ejecuta en el nodo 4.3.2. Para reiterar, ¿se Promise.allpuede administrar de manera que solo un cierto número de Promesas estén en progreso en un momento dado?

Chris
fuente
3
No olvide que Promise.allsí gestiona la progresión de las promesas: las promesas lo hacen por sí mismas, Promise.allsolo las espera.
Bergi
1
¡Evita el Promiseantipatrón del constructor !
Bergi

Respuestas:

51

Tenga en cuenta que Promise.all()no activa las promesas para comenzar su trabajo, la creación de la promesa sí lo hace.

Con eso en mente, una solución sería verificar cada vez que se resuelve una promesa si se debe iniciar una nueva promesa o si ya está en el límite.

Sin embargo, aquí no es necesario reinventar la rueda. Una biblioteca que puede utilizar para este propósito eses6-promise-pool . De sus ejemplos:

// On the Web, leave out this line and use the script tag above instead. 
var PromisePool = require('es6-promise-pool')

var promiseProducer = function () {
  // Your code goes here. 
  // If there is work left to be done, return the next work item as a promise. 
  // Otherwise, return null to indicate that all promises have been created. 
  // Scroll down for an example. 
}

// The number of promises to process simultaneously. 
var concurrency = 3

// Create a pool. 
var pool = new PromisePool(promiseProducer, concurrency)

// Start the pool. 
var poolPromise = pool.start()

// Wait for the pool to settle. 
poolPromise.then(function () {
  console.log('All promises fulfilled')
}, function (error) {
  console.log('Some promise rejected: ' + error.message)
})
Timo
fuente
25
Es lamentable que es6-promise-pool reinvente Promise en lugar de usarlos. Sugiero esta solución concisa en su lugar (si ya está usando ES6 o ES7) github.com/rxaviers/async-pool
Rafael Xavier
3
Eché un vistazo a ambos, ¡async-pool se ve mucho mejor! Más sencillo y más ligero.
Sin fin
2
También he encontrado que p-limit es la implementación más simple. Vea mi ejemplo a continuación. stackoverflow.com/a/52262024/8177355
Matthew Rideout
2
Creo que tiny-asyc-pool es una solución mucho mejor, no intrusiva y bastante natural para limitar la concurrencia de promesas.
Sunny Tambi
73

Límite P

He comparado la limitación de concurrencia de promesas con un script personalizado, bluebird, es6-promise-pool y p-limit. Creo que p-limit tiene la implementación más simple y reducida para esta necesidad. Consulte su documentación .

Requisitos

Para ser compatible con async en el ejemplo

Mi ejemplo

En este ejemplo, necesitamos ejecutar una función para cada URL en la matriz (como, tal vez, una solicitud de API). Aquí esto se llama fetchData(). Si tuviéramos una matriz de miles de elementos para procesar, la simultaneidad definitivamente sería útil para ahorrar en recursos de CPU y memoria.

const pLimit = require('p-limit');

// Example Concurrency of 3 promise at once
const limit = pLimit(3);

let urls = [
    "http://www.exampleone.com/",
    "http://www.exampletwo.com/",
    "http://www.examplethree.com/",
    "http://www.examplefour.com/",
]

// Create an array of our promises using map (fetchData() returns a promise)
let promises = urls.map(url => {

    // wrap the function we are calling in the limit function we defined above
    return limit(() => fetchData(url));
});

(async () => {
    // Only three promises are run at once (as defined above)
    const result = await Promise.all(promises);
    console.log(result);
})();

El resultado del registro de la consola es una matriz de los datos de respuesta a las promesas resueltas.

Matthew Rideout
fuente
4
¡Gracias por este! Este es mucho más simple
Juan
3
Esta fue, con mucho, la mejor biblioteca que he visto para limitar las solicitudes simultáneas. ¡Y gran ejemplo, gracias!
Chris Livdahl
2
Gracias por hacer la comparación. ¿Ha comparado con github.com/rxaviers/async-pool ?
ahong
1
Fácil de usar, gran elección.
drmrbrewer
22

Utilizando Array.prototype.splice

while (funcs.length) {
  // 100 at at time
  await Promise.all( funcs.splice(0, 100).map(f => f()) )
}
caviar desacelerado
fuente
2
Esta es una solución infravalorada. Amo la sencillez.
Brannon
8
Esto ejecuta funciones en lotes en lugar de en grupo, donde una función se llama inmediatamente cuando termina otra.
cltsang
¡Me encantó esta solución!
prasun
tomó un segundo para comprender lo que está haciendo con la falta de más contexto a su alrededor, como si fuera un lote en lugar de un grupo, por ejemplo. Está reordenando la matriz cada vez que empalma desde el principio o en el medio. (el navegador tiene que volver a indexar todos los elementos) un rendimiento teórico mejor alternativo es tomar cosas del final en arr.splice(-100)su lugar, si la orden no coincide, tal vez pueda revertir la matriz: P
Sin fin
Muy útil para correr en lotes. Nota: el siguiente lote no comenzará hasta que el lote actual esté completo al 100%.
Casey Dwayne
20

Si sabe cómo funcionan los iteradores y cómo se consumen, no necesitaría ninguna biblioteca adicional, ya que puede resultar muy fácil crear su propia simultaneidad. Déjame demostrarte:

/* [Symbol.iterator]() is equivalent to .values()
const iterator = [1,2,3][Symbol.iterator]() */
const iterator = [1,2,3].values()


// loop over all items with for..of
for (const x of iterator) {
  console.log('x:', x)
  
  // notices how this loop continues the same iterator
  // and consumes the rest of the iterator, making the
  // outer loop not logging any more x's
  for (const y of iterator) {
    console.log('y:', y)
  }
}

Podemos usar el mismo iterador y compartirlo con los trabajadores.

Si hubiera usado en .entries()lugar de .values(), habría obtenido una matriz 2D con la [[index, value]]que demostraré a continuación con una concurrencia de 2

const sleep = t => new Promise(rs => setTimeout(rs, t))

async function doWork(iterator) {
  for (let [index, item] of iterator) {
    await sleep(1000)
    console.log(index + ': ' + item)
  }
}

const iterator = Array.from('abcdefghij').entries()
const workers = new Array(2).fill(iterator).map(doWork)
//    ^--- starts two workers sharing the same iterator

Promise.allSettled(workers).then(() => console.log('done'))

El beneficio de esto es que puede tener una función de generador en lugar de tener todo listo a la vez.


Nota: lo diferente de esto en comparación con el ejemplo async-pool es que genera dos trabajadores, por lo que si un trabajador arroja un error por alguna razón en el índice 5, no evitará que el otro trabajador haga el resto. Así que pasas de hacer 2 simultaneidad a 1. (para que no se detenga ahí) Entonces mi consejo es que detectes todos los errores dentro de la doWorkfunción

Interminable
fuente
¡Esto es asombroso! ¡Gracias Endless!
user3413723
¡Este es definitivamente un enfoque genial! Solo asegúrate de que tu simultaneidad no exceda la longitud de tu lista de tareas (si de todos modos te preocupan los resultados) ¡ya que podrías terminar con extras!
Kris Oye
Algo que podría ser más interesante más adelante es cuando Streams obtenga soporte para Readable. From (iterator) . Chrome ya ha hecho que las transmisiones sean transferibles . por lo que podría crear transmisiones legibles y enviarlas a los trabajadores web, y todas terminarían usando el mismo iterador subyacente.
Sin fin
16

Promise.map de bluebird puede tomar una opción de concurrencia para controlar cuántas promesas deben ejecutarse en paralelo. A veces es más fácil que .allporque no necesita crear la matriz de promesa.

const Promise = require('bluebird')

function getCounts() {
  return Promise.map(users, user => {
    return new Promise(resolve => {
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => {
        /* snip */
        resolve();
       });
    });
  }, {concurrency: 10}); // <---- at most 10 http requests at a time
}
Jingshao Chen
fuente
bluebird es excelente si necesita promesas más rápidas y ~ 18kb de basura adicional si solo lo usa para una cosa;)
Endless
1
Todo depende de lo importante que sea una cosa para ti y si hay otra forma más rápida / fácil mejor. Una compensación típica. Elegiré la facilidad de uso y la función en unos pocos kb, pero YMMV.
Jingshao Chen
11

En lugar de usar promesas para limitar las solicitudes http, use http.Agent.maxSockets integrado en el nodo . Esto elimina el requisito de usar una biblioteca o escribir su propio código de agrupación y tiene la ventaja adicional de tener más control sobre lo que está limitando.

agent.maxSockets

Por defecto establecido en Infinito. Determina cuántos sockets simultáneos puede tener abiertos el agente por origen. El origen es una combinación de 'host: puerto' o 'host: puerto: localAddress'.

Por ejemplo:

var http = require('http');
var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin
var request = http.request({..., agent: agent}, ...);

Si realiza varias solicitudes al mismo origen, también podría ser beneficioso establecerlo keepAliveen verdadero (consulte los documentos anteriores para obtener más información).

tcooc
fuente
11
Aún así, ¿crear miles de cierres de inmediato y agrupar sockets no parece ser muy eficiente?
Bergi
3

Sugiero la biblioteca async-pool: https://github.com/rxaviers/async-pool

npm install tiny-async-pool

Descripción:

Ejecute múltiples funciones asíncronas y de devolución de promesas con simultaneidad limitada utilizando ES6 / ES7 nativo

asyncPool ejecuta múltiples funciones asíncronas y de devolución de promesas en un grupo de simultaneidad limitado. Rechaza inmediatamente tan pronto como una de las promesas rechaza. Se resuelve cuando se cumplen todas las promesas. Llama a la función de iterador lo antes posible (por debajo del límite de concurrencia).

Uso:

const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
// Call iterator (i = 1000)
// Call iterator (i = 5000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 1000 finishes
// Call iterator (i = 3000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 3000 finishes
// Call iterator (i = 2000)
// Itaration is complete, wait until running ones complete...
// 5000 finishes
// 2000 finishes
// Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.
Venryx
fuente
1
Funciona para mi. Gracias. Esta es una gran biblioteca.
Sunny Tambi
2

Se puede resolver mediante recursividad.

La idea es que inicialmente envíe la cantidad máxima permitida de solicitudes y cada una de estas solicitudes debe continuar enviándose de forma recursiva una vez completada.

function batchFetch(urls, concurrentRequestsLimit) {
    return new Promise(resolve => {
        var documents = [];
        var index = 0;

        function recursiveFetch() {
            if (index === urls.length) {
                return;
            }
            fetch(urls[index++]).then(r => {
                documents.push(r.text());
                if (documents.length === urls.length) {
                    resolve(documents);
                } else {
                    recursiveFetch();
                }
            });
        }

        for (var i = 0; i < concurrentRequestsLimit; i++) {
            recursiveFetch();
        }
    });
}

var sources = [
    'http://www.example_1.com/',
    'http://www.example_2.com/',
    'http://www.example_3.com/',
    ...
    'http://www.example_100.com/'
];
batchFetch(sources, 5).then(documents => {
   console.log(documents);
});
Anton Fil
fuente
2

Aquí está mi solución ES7 para copiar y pegar y contar con una característica completa Promise.all()/ map()alternativa, con un límite de concurrencia.

Similar a Promise.all(), mantiene el orden de devolución, así como un respaldo para los valores de devolución no prometidos.

También incluí una comparación de las diferentes implementaciones, ya que ilustra algunos aspectos que algunas de las otras soluciones han pasado por alto.

Uso

const asyncFn = delay => new Promise(resolve => setTimeout(() => resolve(), delay));
const args = [30, 20, 15, 10];
await asyncPool(args, arg => asyncFn(arg), 4); // concurrency limit of 4

Implementación

async function asyncBatch(args, fn, limit = 8) {
  // Copy arguments to avoid side effects
  args = [...args];
  const outs = [];
  while (args.length) {
    const batch = args.splice(0, limit);
    const out = await Promise.all(batch.map(fn));
    outs.push(...out);
  }
  return outs;
}

async function asyncPool(args, fn, limit = 8) {
  return new Promise((resolve) => {
    // Copy arguments to avoid side effect, reverse queue as
    // pop is faster than shift
    const argQueue = [...args].reverse();
    let count = 0;
    const outs = [];
    const pollNext = () => {
      if (argQueue.length === 0 && count === 0) {
        resolve(outs);
      } else {
        while (count < limit && argQueue.length) {
          const index = args.length - argQueue.length;
          const arg = argQueue.pop();
          count += 1;
          const out = fn(arg);
          const processOut = (out, index) => {
            outs[index] = out;
            count -= 1;
            pollNext();
          };
          if (typeof out === 'object' && out.then) {
            out.then(out => processOut(out, index));
          } else {
            processOut(out, index);
          }
        }
      }
    };
    pollNext();
  });
}

Comparación

// A simple async function that returns after the given delay
// and prints its value to allow us to determine the response order
const asyncFn = delay => new Promise(resolve => setTimeout(() => {
  console.log(delay);
  resolve(delay);
}, delay));

// List of arguments to the asyncFn function
const args = [30, 20, 15, 10];

// As a comparison of the different implementations, a low concurrency
// limit of 2 is used in order to highlight the performance differences.
// If a limit greater than or equal to args.length is used the results
// would be identical.

// Vanilla Promise.all/map combo
const out1 = await Promise.all(args.map(arg => asyncFn(arg)));
// prints: 10, 15, 20, 30
// total time: 30ms

// Pooled implementation
const out2 = await asyncPool(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 15, 10
// total time: 40ms

// Batched implementation
const out3 = await asyncBatch(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 20, 30
// total time: 45ms

console.log(out1, out2, out3); // prints: [30, 20, 15, 10] x 3

// Conclusion: Execution order and performance is different,
// but return order is still identical

Conclusión

asyncPool() debería ser la mejor solución, ya que permite que se inicien nuevas solicitudes tan pronto como finalice una anterior.

asyncBatch() se incluye como una comparación, ya que su implementación es más simple de entender, pero debería tener un rendimiento más lento, ya que todas las solicitudes del mismo lote deben finalizar para iniciar el siguiente lote.

En este ejemplo artificial, la vainilla no limitada Promise.all()es, por supuesto, la más rápida, mientras que las otras podrían tener un desempeño más deseable en un escenario de congestión del mundo real.

Actualizar

La biblioteca async-pool que otros ya han sugerido es probablemente una mejor alternativa a mi implementación, ya que funciona casi de manera idéntica y tiene una implementación más concisa con un uso inteligente de Promise.race (): https://github.com/rxaviers/ async-pool / blob / master / lib / es7.js

Espero que mi respuesta pueda tener un valor educativo.

Adelost
fuente
1

Aquí va un ejemplo básico para streaming y 'p-limit'. Transmite http read stream a mongo db.

const stream = require('stream');
const util = require('util');
const pLimit = require('p-limit');
const es = require('event-stream');
const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;


const pipeline = util.promisify(stream.pipeline)

const outputDBConfig = {
    dbURL: 'yr-db-url',
    collection: 'some-collection'
};
const limit = pLimit(3);

async yrAsyncStreamingFunction(readStream) => {
        const mongoWriteStream = streamToMongoDB(outputDBConfig);
        const mapperStream = es.map((data, done) => {
                let someDataPromise = limit(() => yr_async_call_to_somewhere())

                    someDataPromise.then(
                        function handleResolve(someData) {

                            data.someData = someData;    
                            done(null, data);
                        },
                        function handleError(error) {
                            done(error)
                        }
                    );
                })

            await pipeline(
                readStream,
                JSONStream.parse('*'),
                mapperStream,
                mongoWriteStream
            );
        }
gosuer1921
fuente
0

Así que traté de hacer que algunos ejemplos que se muestran funcionen para mi código, pero como esto era solo para un script de importación y no para un código de producción, usar el paquete npm batch-promises seguramente fue la ruta más fácil para mí

NOTA: Requiere tiempo de ejecución para admitir Promise o para ser polietileno.

Api batchPromises (int: batchSize, array: Collection, i => Promise: Iteratee) La Promesa: Iteratee se llamará después de cada lote.

Utilizar:

batch-promises
Easily batch promises

NOTE: Requires runtime to support Promise or to be polyfilled.

Api
batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee)
The Promise: Iteratee will be called after each batch.

Use:
import batchPromises from 'batch-promises';
 
batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => {
 
  // The iteratee will fire after each batch resulting in the following behaviour:
  // @ 100ms resolve items 1 and 2 (first batch of 2)
  // @ 200ms resolve items 3 and 4 (second batch of 2)
  // @ 300ms resolve remaining item 5 (last remaining batch)
  setTimeout(() => {
    resolve(i);
  }, 100);
}))
.then(results => {
  console.log(results); // [1,2,3,4,5]
});

Agusti Fernandez Pardo
fuente
0

La recursividad es la respuesta si no desea utilizar bibliotecas externas

downloadAll(someArrayWithData){
  var self = this;

  var tracker = function(next){
    return self.someExpensiveRequest(someArrayWithData[next])
    .then(function(){
      next++;//This updates the next in the tracker function parameter
      if(next < someArrayWithData.length){//Did I finish processing all my data?
        return tracker(next);//Go to the next promise
      }
    });
  }

  return tracker(0); 
}
Juan
fuente
0

Esto es lo que hice usando Promise.race, dentro de mi código aquí

const identifyTransactions = async function() {
  let promises = []
  let concurrency = 0
  for (let tx of this.transactions) {
    if (concurrency > 4)
      await Promise.race(promises).then(r => { promises = []; concurrency = 0 })
    promises.push(tx.identifyTransaction())
    concurrency++
  }
  if (promises.length > 0)
    await Promise.race(promises) //resolve the rest
}

Si quieres ver un ejemplo: https://jsfiddle.net/thecodermarcelo/av2tp83o/5/

Alex
fuente
2
Yo no llamaría a eso simultaneidad ... Eso es más como ejecución por lotes ... Usted hace 4 tareas, espera a que todas terminen y luego hace las siguientes 4. Si una de ellas se resuelve antes, todavía espera a que las otras 3 terminen , lo que debería usar esPromise.race
sin fin
0
  • @tcoocLa respuesta de fue bastante buena. No lo sabía y lo aprovecharé en el futuro.
  • También disfruté la respuesta de @MatthewRideout , ¡pero usa una biblioteca externa!

Siempre que sea posible, intento desarrollar este tipo de cosas por mi cuenta, en lugar de buscar una biblioteca. Terminas aprendiendo muchos conceptos que antes parecían abrumadores.

¿Qué piensan ustedes de este intento?
(Lo pensé mucho y creo que está funcionando, pero señalen si no es así o si hay algo fundamentalmente mal)

 class Pool{
        constructor(maxAsync) {
            this.maxAsync = maxAsync;
            this.asyncOperationsQueue = [];
            this.currentAsyncOperations = 0
        }

        runAnother() {
            if (this.asyncOperationsQueue.length > 0 && this.currentAsyncOperations < this.maxAsync) {
                this.currentAsyncOperations += 1;
                this.asyncOperationsQueue.pop()()
                    .then(() => { this.currentAsyncOperations -= 1; this.runAnother() }, () => { this.currentAsyncOperations -= 1; this.runAnother() })
            }
        }

        add(f){  // the argument f is a function of signature () => Promise
            this.runAnother();
            return new Promise((resolve, reject) => {
                this.asyncOperationsQueue.push(
                    () => f().then(resolve).catch(reject)
                )
            })
        }
    }

//#######################################################
//                        TESTS
//#######################################################

function dbCall(id, timeout, fail) {
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            if (fail) {
               reject(`Error for id ${id}`);
            } else {
                resolve(id);
            }
        }, timeout)
    }
    )
}


const dbQuery1 = () => dbCall(1, 5000, false);
const dbQuery2 = () => dbCall(2, 5000, false);
const dbQuery3 = () => dbCall(3, 5000, false);
const dbQuery4 = () => dbCall(4, 5000, true);
const dbQuery5 = () => dbCall(5, 5000, false);


const cappedPool = new Pool(2);

const dbQuery1Res = cappedPool.add(dbQuery1).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery2Res = cappedPool.add(dbQuery2).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery3Res = cappedPool.add(dbQuery3).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery4Res = cappedPool.add(dbQuery4).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery5Res = cappedPool.add(dbQuery5).catch(i => i).then(i => console.log(`Resolved: ${i}`))

Este enfoque proporciona una buena API, similar a los grupos de subprocesos en scala / java.
Después de crear una instancia del grupo con const cappedPool = new Pool(2), le promete simplemente cappedPool.add(() => myPromise).
Sin darnos cuenta debemos asegurarnos de que la promesa no se inicie de inmediato y por eso debemos "brindarla perezosamente" con la ayuda de la función.

Lo más importante add es que tenga en cuenta que el resultado del método es una Promesa que se completará / resolverá con el valor de su promesa original . Esto lo convierte en un uso muy intuitivo.

const resultPromise = cappedPool.add( () => dbCall(...))
resultPromise
.then( actualResult => {
   // Do something with the result form the DB
  }
)
Carlos Teixeira
fuente
0

Desafortunadamente, no hay forma de hacerlo con el Promise.all nativo, por lo que debes ser creativo.

Esta es la forma más rápida y concisa que pude encontrar sin usar bibliotecas externas.

Hace uso de una característica de JavaScript más nueva llamada iterador. Básicamente, el iterador realiza un seguimiento de los elementos que se han procesado y los que no.

Para usarlo en el código, crea una matriz de funciones asíncronas. Cada función asincrónica solicita al mismo iterador el siguiente elemento que debe procesarse. Cada función procesa su propio elemento de forma asincrónica y, cuando termina, solicita al iterador uno nuevo. Una vez que el iterador se queda sin elementos, todas las funciones se completan.

Gracias a @Endless por la inspiración.

var items = [
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
    "https://www.stackoverflow.com",
];

var concurrency = 5

Array(concurrency).fill(items.entries()).map(async (cursor) => {
    for(let [index, url] of cursor){
        console.log("getting url is ", index, url);
        // run your async task instead of this next line
        var text = await fetch(url).then(res => res.text());
        console.log("text is", text.slice(0,20));
    }
})

usuario3413723
fuente
Tengo curiosidad por saber por qué se anotó esto. Es muy similar a lo que se me ocurrió.
Kris Oye
0

Tantas buenas soluciones. Comencé con la elegante solución publicada por @Endless y terminé con este pequeño método de extensión que no usa bibliotecas externas ni se ejecuta en lotes (aunque se supone que tiene características como async, etc.):

Promise.allWithLimit = async (taskList, limit = 5) => {
    const iterator = taskList.entries();
    let results = new Array(taskList.length);
    let workerThreads = new Array(limit).fill(0).map(() => 
        new Promise(async (resolve, reject) => {
            try {
                let entry = iterator.next();
                while (!entry.done) {
                    let [index, promise] = entry.value;
                    try {
                        results[index] = await promise;
                        entry = iterator.next();
                    }
                    catch (err) {
                        results[index] = err;
                    }
                }
                // No more work to do
                resolve(true); 
            }
            catch (err) {
                // This worker is dead
                reject(err);
            }
        }));

    await Promise.all(workerThreads);
    return results;
};

    Promise.allWithLimit = async (taskList, limit = 5) => {
        const iterator = taskList.entries();
        let results = new Array(taskList.length);
        let workerThreads = new Array(limit).fill(0).map(() => 
            new Promise(async (resolve, reject) => {
                try {
                    let entry = iterator.next();
                    while (!entry.done) {
                        let [index, promise] = entry.value;
                        try {
                            results[index] = await promise;
                            entry = iterator.next();
                        }
                        catch (err) {
                            results[index] = err;
                        }
                    }
                    // No more work to do
                    resolve(true); 
                }
                catch (err) {
                    // This worker is dead
                    reject(err);
                }
            }));
    
        await Promise.all(workerThreads);
        return results;
    };

    const demoTasks = new Array(10).fill(0).map((v,i) => new Promise(resolve => {
       let n = (i + 1) * 5;
       setTimeout(() => {
          console.log(`Did nothing for ${n} seconds`);
          resolve(n);
       }, n * 1000);
    }));

    var results = Promise.allWithLimit(demoTasks);

Kris Oye
fuente