¿Cómo puedo asegurarme de que un trabajo no se ejecute dos veces en Bull?

11

Tengo dos funciones, scheduleScan()y scan().

scan()llama scheduleScan() cuando no hay nada más que hacer, excepto programar una nueva exploración , por lo que scheduleScan()puede programar una scan(). Pero hay un problema, algunos trabajos se ejecutan dos veces.

Quiero asegurarme de que solo se esté procesando un trabajo en un momento dado. ¿Cómo puedo lograr eso? Creo que tiene algo que ver con done()(estaba en scan (), eliminado ahora) pero no pude encontrar una solución.

Versión Bull: 3.12.1

Edición tardía importante: scan() llama a otras funciones y pueden o no llamar a otras funciones, pero todas son funciones de sincronización, por lo que solo llaman a una función cuando se completan sus propios trabajos, solo hay una forma de avanzar. Al final del "árbol", lo llamo, la última función llama a scheduleScan (), pero no puede haber dos trabajos simultáneos en ejecución. Cada trabajo comienza en scan(), por cierto, y termina conscheduleScan(stock, period, milliseconds, 'called by file.js')

export function update(job) {
  // does some calculations, then it may call scheduleScan() or
  // it may call another function, and that could be the one calling
  // scheduleScan() function.
  // For instance, a function like finalize()
}

export function scan(job) {
  update(job)
}


import moment from 'moment'
import stringHash from 'string-hash'
const opts = { redis: { port: 6379, host: '127.0.0.1', password: mypassword' } }
let queue = new Queue('scan', opts)

queue.process(1, (job) => {
  job.progress(100).then(() => {
    scan(job)
  })
})

export function scheduleScan (stock, period, milliseconds, triggeredBy) {
  let uniqueId = stringHash(stock + ':' + period)

  queue.getJob(uniqueId).then(job => {
    if (!job) {
      if (milliseconds) {
        queue.add({ stock, period, triggeredBy }, { delay: milliseconds, jobId: uniqueId }).then(() => {
          // console.log('Added with ms: ' + stock + ' ' + period)
        }).catch(err => {
          if (err) {
            console.log('Can not add because it exists ' + new Date())
          }
        })
      } else {
        queue.add({ stock, period, triggeredBy }, { jobId: uniqueId }).then(() => {
          // console.log('Added without ms: ' + stock + ' ' + period)
        }).catch(err => {
          if (err) {
            console.log('Can not add because it exists ' + new Date())
          }
        })
      }
    } else {
      job.getState().then(state => {
        if (state === 'completed') {
          job.remove().then(() => {
            if (milliseconds) {
              queue.add({ stock, period, triggeredBy }, { delay: milliseconds, jobId: uniqueId }).then(() => {
                // console.log('Added with ms: ' + stock + ' ' + period)
              }).catch(err => {
                if (err) {
                  console.log('Can not add because it exists ' + new Date())
                }
              })
            } else {
              queue.add({ stock, period, triggeredBy }, { jobId: uniqueId }).then(() => {
                // console.log('Added without ms: ' + stock + ' ' + period)
              }).catch(err => {
                if (err) {
                  console.log('Can not add because it exists ' + new Date())
                }
              })
            }
          }).catch(err => {
            if (err) {
              // console.log(err)
            }
          })
        }
      }).catch(err => {
        // console.log(err)
      })
    }
  })
}
salep
fuente
No puedo encontrar la scanfunción, ¿pueden ayudarme?
Muhammad Zeeshan
@MuhammadZeeshan Lo agregué, mi error.
Salep

Respuestas:

6

El problema, creo es que su scanfunción es asíncrona. Entonces su job.progressfunción llama scany luego llama inmediatamentedone permitiendo que la cola procese otro trabajo.

Una solución podría ser pasar la donedevolución de llamada como parámetro a su scanyscheduleScan funciones, e invocar, una vez que haya completado su trabajo (o en caso de error).

Otra (mejor) solución podría ser asegurarse de que siempre devuelva un Promisedesde scany scheduleScanluego esperar la promesa de resolver y luego llamar done. Si hace esto, asegúrese de encadenar todos los retornos de su promesa en su scheduleScanfunción.

queue.process(1, (job, done) => {
  job.progress(100).then(() => {
    scan(job)
        .then(done)
        .catch(done)
  })
})

export function scan() {
   // business logic
   return scheduleScan()
}

// Chain all of your promise returns. Otherwise
// the scan function will return sooner and allow done to be called
// prior to the scheduleScan function finishing it's execution
export function scheduleScan() {
    return queue.getJob(..).then(() => {
        ....
        return queue.add()...
        ....
        return queue.add(...)
            .catch(e => {
                 console.log(e);
                 // propogate errors!
                 throw e;
             })

}
jeeves
fuente
He editado mi pregunta, ¿puede revisarla nuevamente, especialmente la parte "Edición tardía importante"? ¿Su respuesta aún se aplica en esta situación? Gracias.
Salep
1
Sí, sigue siendo válido. Desde su edición, creo que está diciendo scheduledScanque siempre se llama después de todas las demás funciones de sincronización scan. Si este es el caso, entonces sí, mi respuesta sigue siendo válida. Simplemente siempre devuelva la promesa que se devolverá scheduleScanen la scanfunción
jeeves
De nuevo, mi error. La primera función, update (), está en exploración, pero update () puede llamar a otra función como finalize () y finalize () puede llamar a scheduleScan (). Tenga en cuenta que esto sucede en un orden, por lo que no hay varias llamadas, estoy haciendo esto para mantener mi aplicación modular. - Gracias
salep
1
Sí, la misma respuesta. Si updatellamadas scheduledScano cualquier número de funciones entre ellos. El punto clave es que debe devolver la cadena de promesa desde scheduleScanel principio hasta la scanfunción. Entonces, si scanllama a updatequé llamadas finalise..... Qué llamadas a scheduleScanla cadena de promesa deberán devolverse a través de todas las invocaciones de funciones, es decir, solo asegúrese de devolver la promesa de cada una de estas funciones.
jeeves
Tan solo para aclarar mi último comentario. Por ejemplo, si dentro de la exploración llama a actualización. Debe devolver el resultado de la actualización (una promesa) de la función de escaneo.
jeeves
4

La función de escaneo es una función asincrónica. En su queue.process()función, debe esperar la función de escaneo y luego llamar a la done()devolución de llamada.

export async function scan(job) {
  // it does some calculations, then it creates a new schedule.
  return scheduleScan(stock, period, milliseconds, "scan.js");
}

queue.process(1, (job, done) => {
  job.progress(100).then(async() => {
    await scan(job);
    done();
  });
});

export async function scheduleScan(stock, period, milliseconds, triggeredBy) {
    let uniqueId = stringHash(stock + ":" + period);
    try {
      const existingJob = await queue.getJob(uniqueId);
      if (!existingJob) {
        const job = await addJob({
          queue,
          stock,
          period,
          uniqueId,
          milliseconds,
          triggeredBy
        });
        return job;
      } else {
        const jobState = await existingJob.getState();
        if (jobState === "completed") {
          await existingJob.remove();
          const newJob = await addJob({
            queue,
            stock,
            period,
            uniqueId,
            milliseconds,
            triggeredBy
          });
          return newJob;
        }
      }
    } catch (err) {
      throw new Error(err);
    }
}

export function addJob({ queue, stock, period, milliseconds, triggeredBy }) {
  if (milliseconds) {
    return queue.add(
      { stock, period, triggeredBy },
      { delay: milliseconds, jobId: uniqueId }
    );
  } else {
    return queue.add({ stock, period, triggeredBy }, { jobId: uniqueId });
  }
}

¡Prueba esto! Intenté refactorizar un poco el código usando async-await.

Adithya Sreyaj
fuente
He editado mi pregunta, ¿puede revisarla nuevamente, especialmente la parte "Edición tardía importante"? ¿Su respuesta aún se aplica en esta situación? Gracias.
Salep