Confundido cuando el método de ejecución boost :: asio :: io_service bloquea / desbloquea

88

Siendo un principiante total en Boost.Asio, estoy confundido con io_service::run(). Agradecería que alguien me explicara cuándo se bloquea / desbloquea este método. La documentación dice:

La run()función se bloquea hasta que todo el trabajo haya terminado y no haya más controladores que enviar, o hasta que io_servicese haya detenido.

Varios subprocesos pueden llamar a la run()función para configurar un grupo de subprocesos desde los cuales io_servicepueden ejecutar controladores. Todos los subprocesos que están esperando en el grupo son equivalentes y io_servicepueden elegir cualquiera de ellos para invocar un controlador.

Una salida normal de la run()función implica que el io_serviceobjeto se detiene (la stopped()función devuelve verdadero). Las llamadas posteriores a run(), run_one(), poll()o poll_one()volverán de inmediato a menos que haya una llamada antes reset().

¿Qué significa la siguiente declaración?

[...] no se enviarán más manipuladores [...]


Mientras trataba de comprender el comportamiento de io_service::run(), encontré este ejemplo (ejemplo 3a). Dentro de ella, observo que io_service->run()bloquea y espera órdenes de trabajo.

// WorkerThread invines io_service->run()
void WorkerThread(boost::shared_ptr<boost::asio::io_service> io_service);
void CalculateFib(size_t);

boost::shared_ptr<boost::asio::io_service> io_service(
    new boost::asio::io_service);
boost::shared_ptr<boost::asio::io_service::work> work(
   new boost::asio::io_service::work(*io_service));

// ...

boost::thread_group worker_threads;
for(int x = 0; x < 2; ++x)
{
  worker_threads.create_thread(boost::bind(&WorkerThread, io_service));
}

io_service->post( boost::bind(CalculateFib, 3));
io_service->post( boost::bind(CalculateFib, 4));
io_service->post( boost::bind(CalculateFib, 5));

work.reset();
worker_threads.join_all();

Sin embargo, en el siguiente código en el que estaba trabajando, el cliente se conecta mediante TCP / IP y el método de ejecución se bloquea hasta que los datos se reciben de forma asincrónica.

typedef boost::asio::ip::tcp tcp;
boost::shared_ptr<boost::asio::io_service> io_service(
    new boost::asio::io_service);
boost::shared_ptr<tcp::socket> socket(new tcp::socket(*io_service));

// Connect to 127.0.0.1:9100.
tcp::resolver resolver(*io_service);
tcp::resolver::query query("127.0.0.1", 
                           boost::lexical_cast< std::string >(9100));
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
socket->connect(endpoint_iterator->endpoint());

// Just blocks here until a message is received.
socket->async_receive(boost::asio::buffer(buf_client, 3000), 0,
                      ClientReceiveEvent);
io_service->run();

// Write response.
boost::system::error_code ignored_error;
std::cout << "Sending message \n";
boost::asio::write(*socket, boost::asio::buffer("some data"), ignored_error);

Se run()agradecería cualquier explicación de que describa su comportamiento en los dos ejemplos siguientes.

MistyD
fuente

Respuestas:

234

Fundación

Comencemos con un ejemplo simplificado y examinemos las piezas relevantes de Boost.Asio:

void handle_async_receive(...) { ... }
void print() { ... }

...  

boost::asio::io_service io_service;
boost::asio::ip::tcp::socket socket(io_service);

...

io_service.post(&print);                             // 1
socket.connect(endpoint);                            // 2
socket.async_receive(buffer, &handle_async_receive); // 3
io_service.post(&print);                             // 4
io_service.run();                                    // 5

¿Qué es un manejador ?

Un controlador no es más que una devolución de llamada. En el código de ejemplo, hay 3 controladores:

  • El printmanejador (1).
  • El handle_async_receivemanejador (3).
  • El printmanejador (4).

Aunque la misma print()función se utiliza dos veces, se considera que cada uso crea su propio controlador identificable de forma única. Los controladores pueden tener muchas formas y tamaños, que van desde funciones básicas como las anteriores hasta construcciones más complejas como functores generados desde boost::bind()y lambdas. Independientemente de la complejidad, el controlador sigue siendo nada más que una devolución de llamada.

¿Qué es el trabajo ?

El trabajo es un procesamiento que se ha solicitado a Boost.Asio en nombre del código de la aplicación. A veces, Boost.Asio puede comenzar parte del trabajo tan pronto como se le haya informado, y otras veces puede esperar para hacer el trabajo en un momento posterior. Una vez finalizado el trabajo, Boost.Asio informará a la aplicación invocando el controlador suministrado .

Boost.Asio garantiza que los controladores pueden funcionar sólo dentro de un hilo que está llamando actualmente run(), run_one(), poll(), o poll_one(). Estos son los hilos que funcionarán y llamarán a los controladores . Por lo tanto, en el ejemplo anterior, print()no se invoca cuando se publica en io_service(1). En su lugar, se agrega a io_servicey se invocará en un momento posterior. En este caso, está dentro de io_service.run()(5).

¿Qué son las operaciones asincrónicas?

Una operación asincrónica crea trabajo y Boost.Asio invocará un controlador para informar a la aplicación cuando el trabajo se haya completado. Las operaciones asincrónicas se crean llamando a una función que tiene un nombre con el prefijo async_. Estas funciones también se conocen como funciones de inicio .

Las operaciones asincrónicas se pueden descomponer en tres pasos únicos:

  • Iniciar, o informar, el asociado io_serviceque trabaja debe hacerse. La async_receiveoperación (3) informa al io_serviceque necesitará leer datos de forma asincrónica desde el socket, luego async_receiveregresa inmediatamente.
  • Haciendo el trabajo real. En este caso, cuando socketreciba datos, se leerán y copiarán bytes buffer. El trabajo real se realizará en:
    • La función de inicio (3), si Boost.Asio puede determinar que no bloqueará.
    • Cuando la aplicación ejecuta explícitamente io_service(5).
  • Invocando el handle_async_receive ReadHandler . Una vez más, los controladores solo se invocan dentro de los subprocesos que ejecutan io_service. Así, independientemente de cuándo se realice el trabajo (3 o 5), se garantiza que handle_async_receive()solo se invocará dentro de io_service.run()(5).

La separación en el tiempo y el espacio entre estos tres pasos se conoce como inversión de flujo de control. Es una de las complejidades que dificulta la programación asincrónica. Sin embargo, existen técnicas que pueden ayudar a mitigar esto, como el uso de corrutinas .

¿Qué hace io_service.run()?

Cuando un hilo llama io_service.run(), el trabajo y los controladores se invocarán desde dentro de este hilo. En el ejemplo anterior, io_service.run()(5) se bloqueará hasta que:

  • Ha invocado y regresado de ambos printcontroladores, la operación de recepción se completa con éxito o falla, y su handle_async_receivecontrolador ha sido invocado y devuelto.
  • El io_servicese detiene explícitamente a través de io_service::stop().
  • Se lanza una excepción desde dentro de un controlador.

Un posible flujo pseudo-ish podría describirse como el siguiente:

crear io_service
crear socket
agregar controlador de impresión a io_service (1)
espere a que el enchufe se conecte (2)
agregue una solicitud de trabajo de lectura asincrónica a io_service (3)
agregar controlador de impresión a io_service (4)
ejecutar el io_service (5)
  hay trabajo o manipuladores?
    sí, hay 1 trabajo y 2 manipuladores
      ¿Socket tiene datos? no, no hagas nada
      ejecutar controlador de impresión (1)
  hay trabajo o manipuladores?
    sí, hay 1 trabajo y 1 manejador
      ¿Socket tiene datos? no, no hagas nada
      ejecutar controlador de impresión (4)
  hay trabajo o manipuladores?
    si, hay 1 trabajo
      ¿Socket tiene datos? no sigue esperando
  - socket recibe datos -
      socket tiene datos, léalo en el búfer
      agregue el controlador handle_async_receive a io_service
  hay trabajo o manipuladores?
    sí, hay 1 controlador
      ejecutar handle_async_receive handler (3)
  hay trabajo o manipuladores?
    no, establezca io_service como detenido y vuelva

Observe cómo cuando terminó la lectura, agregó otro controlador al io_service. Este sutil detalle es una característica importante de la programación asincrónica. Permite encadenar a los manipuladores . Por ejemplo, si handle_async_receiveno obtuvo todos los datos que esperaba, entonces su implementación podría publicar otra operación de lectura asincrónica, lo que resultaría en io_servicetener más trabajo y, por lo tanto, no regresar io_service.run().

Ten en cuenta que cuando la io_servicecuenta se quedó sin trabajo, la aplicación debe reset()al io_serviceantes de ejecutar de nuevo.


Pregunta de ejemplo y código de ejemplo 3a

Ahora, examinemos las dos piezas de código a las que se hace referencia en la pregunta.

Código de pregunta

socket->async_receiveagrega trabajo al io_service. Por lo tanto, io_service->run()se bloqueará hasta que la operación de lectura se complete con éxito o error, y ClientReceiveEventhaya terminado de ejecutarse o arroje una excepción.

Ejemplo 3a Código

Con la esperanza de que sea más fácil de entender, aquí hay un Ejemplo 3a anotado más pequeño:

void CalculateFib(std::size_t n);

int main()
{
  boost::asio::io_service io_service;
  boost::optional<boost::asio::io_service::work> work =       // '. 1
      boost::in_place(boost::ref(io_service));                // .'

  boost::thread_group worker_threads;                         // -.
  for(int x = 0; x < 2; ++x)                                  //   :
  {                                                           //   '.
    worker_threads.create_thread(                             //     :- 2
      boost::bind(&boost::asio::io_service::run, &io_service) //   .'
    );                                                        //   :
  }                                                           // -'

  io_service.post(boost::bind(CalculateFib, 3));              // '.
  io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
  io_service.post(boost::bind(CalculateFib, 5));              // .'

  work = boost::none;                                         // 4
  worker_threads.join_all();                                  // 5
}

En un nivel alto, el programa creará 2 subprocesos que procesarán el io_servicebucle de eventos de (2). Esto da como resultado un grupo de subprocesos simple que calculará los números de Fibonacci (3).

La única diferencia principal entre el Código de preguntas y este código es que este código invoca io_service::run()(2) antes de que se agreguen el trabajo real y los controladores a io_service(3). Para evitar que io_service::run()vuelva inmediatamente, io_service::workse crea un objeto (1). Este objeto evita que se io_servicequede sin trabajo; por lo tanto, io_service::run()no regresará por no haber trabajado.

El flujo general es el siguiente:

  1. Cree y agregue el io_service::workobjeto agregado al io_service.
  2. Grupo de subprocesos creado que invoca io_service::run(). Estos subprocesos de trabajo no volverán io_servicedebido al io_service::workobjeto.
  3. Agregue 3 controladores que calculan números de Fibonacci al io_service, y regrese inmediatamente. Los subprocesos de trabajo, no el subproceso principal, pueden comenzar a ejecutar estos controladores de inmediato.
  4. Elimina el io_service::workobjeto.
  5. Espere a que terminen de ejecutarse los subprocesos de trabajo. Esto solo ocurrirá una vez que los 3 controladores hayan finalizado la ejecución, ya que io_serviceni tienen controladores ni trabajo.

El código podría escribirse de manera diferente, de la misma manera que el Código original, donde se agregan controladores al io_service, y luego io_servicese procesa el bucle de eventos. Esto elimina la necesidad de usar io_service::worky da como resultado el siguiente código:

int main()
{
  boost::asio::io_service io_service;

  io_service.post(boost::bind(CalculateFib, 3));              // '.
  io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
  io_service.post(boost::bind(CalculateFib, 5));              // .'

  boost::thread_group worker_threads;                         // -.
  for(int x = 0; x < 2; ++x)                                  //   :
  {                                                           //   '.
    worker_threads.create_thread(                             //     :- 2
      boost::bind(&boost::asio::io_service::run, &io_service) //   .'
    );                                                        //   :
  }                                                           // -'
  worker_threads.join_all();                                  // 5
}

Sincrónico frente a asincrónico

Aunque el código de la pregunta utiliza una operación asincrónica, está funcionando efectivamente de forma sincrónica, ya que está esperando a que se complete la operación asincrónica:

socket.async_receive(buffer, handler)
io_service.run();

es equivalente a:

boost::asio::error_code error;
std::size_t bytes_transferred = socket.receive(buffer, 0, error);
handler(error, bytes_transferred);

Como regla general, trate de evitar mezclar operaciones sincrónicas y asincrónicas. A menudo, puede convertir un sistema complejo en un sistema complicado. Esta respuesta destaca las ventajas de la programación asincrónica, algunas de las cuales también se tratan en la documentación de Boost.Asio .

Tanner Sansbury
fuente
13
Publicación impresionante. Me gustaría agregar solo una cosa porque creo que no recibe suficiente atención: después de que run () haya regresado, debe llamar a reset () en su io_service antes de poder ejecutarlo () nuevamente. De lo contrario, puede regresar instantáneamente si hay o no operaciones async_ esperando o no.
DeVadder
¿De dónde viene el búfer? ¿Qué es?
ruipacheco
Todavía estoy confundido Si la mezcla es sincronizada y no se recomienda asincrónica, ¿cuál es el modo asincrónico puro? ¿Puede dar un ejemplo que muestre el código sin io_service.run () ;?
Splash
@Splash One se puede utilizar io_service.poll()para procesar el bucle de eventos sin bloquear las operaciones pendientes. La recomendación principal para evitar mezclar operaciones sincrónicas y asincrónicas es evitar agregar complejidad innecesaria y evitar una respuesta deficiente cuando los controladores tardan mucho en completarse. Hay algunos casos en los que es seguro, como cuando uno sabe que la operación síncrona no se bloqueará.
Tanner Sansbury
¿Qué quieres decir con "actualmente" en "Boost.Asio garantiza que los controladores solo se ejecutarán dentro de un hilo que está llamando actualmenterun() ..." ? Si hay N hilos (que ha llamado run()), ¿cuál es el hilo "actual"? ¿Puede haber muchos? ¿O te refieres al hilo que ha terminado de ejecutar async_*()(digamos async_read), también está garantizado que llamará a sus controladores?
Nawaz
18

Para simplificar cómo lo runhace, piense en él como un empleado que debe procesar una pila de papel; toma una hoja, hace lo que dice la hoja, tira la hoja y toma la siguiente; cuando se le acaban las sábanas, sale de la oficina. En cada hoja puede haber cualquier tipo de instrucción, incluso agregando una nueva hoja al montón. Volviendo a asio: puedes dar a una io_serviceobra de dos formas, esencialmente: usándola postcomo en la muestra que vinculaste, o usando otros objetos que llaman internamente posta io_service, como a sockety sus async_*métodos.

Loghorn
fuente