Tengo un servidor Node que crea un proceso secundario con el fork()
uso de IPC. En algún momento, el niño envía los resultados al padre a aproximadamente 10Hz como parte de una tarea de larga duración. Cuando la carga útil que se pasa process.send()
es pequeña, todo funciona bien: cada mensaje que envío se recibe ~ inmediatamente y el padre lo procesa.
Sin embargo, cuando la carga útil es 'grande' (no he determinado el límite de tamaño exacto) en lugar de ser recibida de inmediato por el padre, todas las cargas se envían primero, y solo una vez que el niño realiza su tarea de larga duración, el padre recibe y procesar los mensajes.
tl; dr visual:
Bueno (sucede con poca carga útil):
child: send()
parent: receive()
child: send()
parent: receive()
child: send()
parent: receive()
...
Malo (sucede con gran carga útil):
child: send()
child: send()
child: send()
(repeat many times over many seconds)
...
parent: receive()
parent: receive()
parent: receive()
parent: receive()
...
- ¿Es esto un error? (Editar: el comportamiento solo ocurre en OS X, no en Windows o Linux)
- ¿Hay alguna forma de evitar esto, aparte de tratar de mantener pequeña mi carga útil de IPC?
Edición 2 : el siguiente código de muestra utiliza el contador de tiempo y de iteración para seleccionar cuándo enviar una actualización. (En mi código actual también es posible enviar una actualización después de n iteraciones, o después de que el ciclo logre ciertos resultados). Como tal, reescribir el código para usar setInterval
/ en setTimeout
lugar de un ciclo es un último recurso para mí, ya que me requiere para eliminar características.
Editar : Aquí hay un código de prueba que reproduce el problema. Sin embargo, solo se reproduce en OS X, no en Windows o Linux:
server.js
const opts = {stdio:['inherit', 'inherit', 'inherit', 'ipc']};
const child = require('child_process').fork('worker.js', [], opts);
child.on('message', msg => console.log(`parent: receive() ${msg.data.length} bytes`, Date.now()));
require('http').createServer((req, res) => {
console.log(req.url);
const match = /\d+/.exec(req.url);
if (match) {
child.send(match[0]*1);
res.writeHead(200, {'Content-Type':'text/plain'});
res.end(`Sending packets of size ${match[0]}`);
} else {
res.writeHead(404, {'Content-Type':'text/plain'});
res.end('what?');
}
}).listen(8080);
trabajador.js
if (process.send) process.on('message', msg => run(msg));
function run(messageSize) {
const msg = new Array(messageSize+1).join('x');
let lastUpdate = Date.now();
for (let i=0; i<1e7; ++i) {
const now = Date.now();
if ((now-lastUpdate)>200 || i%5000==0) {
console.log(`worker: send() > ${messageSize} bytes`, now);
process.send({action:'update', data:msg});
lastUpdate = Date.now();
}
Math.sqrt(Math.random());
}
console.log('worker done');
}
Aproximadamente alrededor de 8k el problema ocurre. Por ejemplo, al consultar http://localhost:8080/15
vshttp://localhost:8080/123456
/15
worker: send() > 15 bytes 1571324249029
parent: receive() 15 bytes 1571324249034
worker: send() > 15 bytes 1571324249235
parent: receive() 15 bytes 1571324249235
worker: send() > 15 bytes 1571324249436
parent: receive() 15 bytes 1571324249436
worker done
/123456
worker: send() > 123456 bytes 1571324276973
worker: send() > 123456 bytes 1571324277174
worker: send() > 123456 bytes 1571324277375
child done
parent: receive() 123456 bytes 1571324277391
parent: receive() 123456 bytes 1571324277391
parent: receive() 123456 bytes 1571324277393
Experimentado en Nodo v12.7 y v12.12.
fuente
setInterval()
?run()
tiene unwhile
bucle? ¿Estás sugiriendo que cambiar eso asetInterval()
resolverá mi problema? Para responder a la pregunta, creo que se está preguntando: uso unwhile
bucle porque esa función es el único propósito de este proceso de trabajo y (con pequeñas cargas útiles de IPC) no causó ningún problema que pudiera ver.setInterval()
libera el ciclo de eventos para realizar E / S en segundo plano. No estoy diciendo que definitivamente resolverá este problema, pero parece una elección extraña escribirlo de la manera que lo ha hecho, solo porque puede hacerlo.setTimeout()
osetInterval()
. El cambio aquí es trivial.Respuestas:
Tener un ciclo de larga duración y bloqueo en combinación con sockets o descriptores de archivos en el nodo siempre es una indicación de que algo está mal hecho.
Sin poder probar toda la configuración, es difícil saber si mi afirmación es realmente correcta, pero los mensajes cortos probablemente se pueden pasar directamente en un fragmento al sistema operativo que luego lo pasa al otro proceso. Con el nodo de mensajes más grandes, debería esperar hasta que el sistema operativo pueda recibir más datos, por lo que el envío se pone en cola y, dado que tiene un bloqueo,
while
el envío es una cola hasta que finalice el tiempoloop
.Entonces, para su pregunta, no es que no sea un error.
A medida que usa una versión reciente de nodejs, usaría una
await
y, enasync
lugar de crear una no bloqueantewhile
similar a lasleep
de esta respuesta . Elawait
permitirá que el bucle de eventos nodo para interceptar siprocessSome
vuelve a la espera de Promise.Para su código que realmente no refleja un caso de uso real, es difícil saber cómo resolverlo correctamente. Si no hace nada asíncrono
processSome
que permita que la E / S intercepte, entonces debe hacerlo manualmente de forma regular, por ejemplo con aawait new Promise(setImmediate);
.fuente
processSome()
código fuera delwhile
bucle. (O tal vez me estoy perdiendo algo crucial relacionado con las promesas.)process.send({action:'update', data:status()});
Actualicé la respuesta para que se ejecute cuandoevery10Hz
es cierto yprocessSome
para cada iteración dewhile
. Elawait
debe permitir que el EvenLoop de nodo para interceptar incluso siprocessSome
es no devuelve una promesa. Pero la razón de su problema sigue siendo que el ciclo está bloqueando.processSome()
no devuelve una promesa, entonces este enfoque aún bloquea las E / S (las microtasks como la continuación producida por estaawait
declaración se procesan antes de IO). Además, esto hará que la iteración funcione mucho más lentamente debido a la microtask en cola en cada iteración.En cuanto a tu primera pregunta
Esto definitivamente no es un error y podría reproducirlo en mi Windows 10 (para el tamaño 123456). Se debe principalmente al almacenamiento intermedio del núcleo subyacente y al cambio de contexto por parte del sistema operativo, ya que dos procesos separados (no separados) se comunican a través de un descriptor IPC.
En cuanto a tu segunda pregunta
Si entiendo el problema correctamente, está tratando de resolver, para cada solicitud http, cada vez que el trabajador devuelve un fragmento al servidor, desea que el servidor lo procese antes de obtener el siguiente fragmento. Así es como entiendo cuando dijiste procesamiento de sincronización
Hay una manera de usar promesas, pero me gustaría usar generadores en los trabajadores. Es mejor orquestar el flujo a través del servidor y el trabajador
Fluir:
server.js
trabajador.js
Si conocemos el caso de uso exacto, podría haber mejores formas de programarlo. Esta es solo una forma de sincronizar el proceso secundario que retiene gran parte de su código fuente original.
fuente
Si necesita garantizar que se reciba un mensaje antes de enviar el siguiente, puede esperar a que el maestro confirme la recepción. Esto retrasará el envío del siguiente mensaje, por supuesto, pero dado que su lógica se basa tanto en el tiempo como en el número de iteración para determinar si se debe enviar un mensaje, entonces puede estar bien para su caso.
La implementación necesitará que cada trabajador cree una promesa para cada mensaje enviado y espere una respuesta del maestro antes de resolver la promesa. Esto también significa que debe identificar qué mensaje se reconoce en función de una identificación de mensaje o algo único si tiene más de un mensaje o trabajador simultáneamente.
aquí está el código modificado
server.js
trabajador.js
PD: No probé el código, por lo que podría necesitar algunos ajustes, pero la idea debería ser válida.
fuente
Si bien estoy de acuerdo con otros en que la solución óptima sería aquella en la que el proceso secundario puede ceder voluntariamente el control al final de cada ciclo, lo que permite que se ejecuten los procesos de descarga del búfer, hay una solución fácil / rápida / sucia que lo hace casi sincrónico comportamiento, y eso es hacer que el niño
send
llame al bloqueo.Use lo mismo
server.js
que antes, y casi lo mismoworker.js
, con solo una línea agregada:trabajador.js
Salida:
fuente