Consuma el mismo mensaje nuevamente si falla el procesamiento del mensaje

10

Estoy usando la versión 1.3.0 del cliente Confluent.Kafka .NET. Estoy siguiendo los documentos :

var consumerConfig = new ConsumerConfig
{
    BootstrapServers = "server1, server2",
    AutoOffsetReset = AutoOffsetReset.Earliest,
    EnableAutoCommit = true,
    EnableAutoOffsetStore = false,
    GroupId = this.groupId,
    SecurityProtocol = SecurityProtocol.SaslPlaintext,
    SaslMechanism = SaslMechanism.Plain,
    SaslUsername = this.kafkaUsername,
    SaslPassword = this.kafkaPassword,
};

using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
    var cancellationToken = new CancellationTokenSource();
    Console.CancelKeyPress += (_, e) =>
    {
        e.Cancel = true;
        cancellationToken.Cancel();
    };

    consumer.Subscribe("my-topic");
    while (true)
    {
        try
        {
            var consumerResult = consumer.Consume();
            // process message
            consumer.StoreOffset(consumerResult);
        }
        catch (ConsumeException e)
        {
            // log
        }
        catch (KafkaException e)
        {
            // log
        }
        catch (OperationCanceledException e)
        {
            // log
        }
    }
}

El problema es que incluso si comento la línea consumer.StoreOffset(consumerResult);, sigo recibiendo el siguiente mensaje no consumido la próxima vez que Consumo , es decir, el desplazamiento sigue aumentando, lo que no parece ser lo que la documentación afirma que es, es decir, al menos una entrega .

Incluso si configuro EnableAutoCommit = falsey elimino 'EnableAutoOffsetStore = false' de la configuración, y lo reemplazo consumer.StoreOffset(consumerResult)con consumer.Commit(), todavía veo el mismo comportamiento, es decir, incluso si comento el Commit, sigo recibiendo los siguientes mensajes no consumidos.

Siento que me falta algo fundamental aquí, pero no puedo entender qué. Cualquier ayuda es apreciada!

havij
fuente
Los mensajes ya se han devuelto a la aplicación desde el punto de vista de kafka, por lo que cuando se confirman se guardan como las últimas compensaciones confirmadas, pero el consumo continuará devolviendo los siguientes mensajes, ya sea que los haya consumido o no. ¿Cuál es tu expectativa aquí? ¿Podría por favor elaborar lo que espera que suceda antes / después de confirmar y consumir?
Sagar Veeram
Los mensajes no se vuelven a buscar hasta que utilice la búsqueda para compensar. Esto afectará el consumo y los mensajes serán devueltos desde el desplazamiento de búsqueda.
Sagar Veeram
@ user2683814 En mi publicación mencioné dos escenarios dependiendo de lo que EnableAutoCommitesté configurado. Digamos que sí EnableAutoCommit = false, y cuando Consumerecibo el mensaje con el desplazamiento 11. Esperaba seguir recibiendo el mismo mensaje con el desplazamiento 11 una y otra vez si el procesamiento del mensaje continúa y, por lo tanto, no se llama Commit.
Havij
No, ese no es el caso. No puede controlar qué sondear ( Consume) usando Commitdespués de que ya tiene Subscribeun tema. Kafka (como en la biblioteca del cliente) detrás de la escena mantiene todos los desplazamientos que ha enviado a la aplicación Consumey los enviará linealmente. Entonces, para volver a procesar un mensaje como en un escenario de falla, debe rastrearlo en su código y buscar compensarlo y comenzar a procesar el mensaje y también debe saber qué omitir si ya se ha procesado en solicitudes anteriores. No estoy familiarizado con la biblioteca .net, pero realmente no debería importar, ya que este es el diseño kafka.
Sagar Veeram
Creo que debe usar la combinación de suscribirse y asignar y puede necesitar diferentes consumidores para respaldar su caso de uso. En caso de fallas, use asignar / buscar compensar las particiones de tema con un consumidor para reprocesar mensajes y para el procesamiento normal, use otro consumidor con flujo de suscripción / Consumo / Compromiso.
Sagar Veeram

Respuestas:

0

Es posible que desee tener una lógica de reintento para procesar cada uno de sus mensajes por un número fijo de veces, como por ejemplo 5. Si no tiene éxito durante estos 5 reintentos, es posible que desee agregar este mensaje a otro tema para manejar todos mensajes fallidos que tienen prioridad sobre su tema real. O es posible que desee agregar el mensaje fallido al mismo tema para que sea recogido más tarde una vez que se hayan consumido todos esos otros mensajes.

Si el procesamiento de cualquier mensaje es exitoso dentro de esos 5 reintentos, puede pasar al siguiente mensaje en la cola.

Raju Dasupally
fuente