Estoy usando la versión 1.3.0 del cliente Confluent.Kafka .NET. Estoy siguiendo los documentos :
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "server1, server2",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true,
EnableAutoOffsetStore = false,
GroupId = this.groupId,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = this.kafkaUsername,
SaslPassword = this.kafkaPassword,
};
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
var cancellationToken = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cancellationToken.Cancel();
};
consumer.Subscribe("my-topic");
while (true)
{
try
{
var consumerResult = consumer.Consume();
// process message
consumer.StoreOffset(consumerResult);
}
catch (ConsumeException e)
{
// log
}
catch (KafkaException e)
{
// log
}
catch (OperationCanceledException e)
{
// log
}
}
}
El problema es que incluso si comento la línea consumer.StoreOffset(consumerResult);
, sigo recibiendo el siguiente mensaje no consumido la próxima vez que Consumo , es decir, el desplazamiento sigue aumentando, lo que no parece ser lo que la documentación afirma que es, es decir, al menos una entrega .
Incluso si configuro EnableAutoCommit = false
y elimino 'EnableAutoOffsetStore = false' de la configuración, y lo reemplazo consumer.StoreOffset(consumerResult)
con consumer.Commit()
, todavía veo el mismo comportamiento, es decir, incluso si comento el Commit
, sigo recibiendo los siguientes mensajes no consumidos.
Siento que me falta algo fundamental aquí, pero no puedo entender qué. Cualquier ayuda es apreciada!
EnableAutoCommit
esté configurado. Digamos que síEnableAutoCommit = false
, y cuandoConsume
recibo el mensaje con el desplazamiento 11. Esperaba seguir recibiendo el mismo mensaje con el desplazamiento 11 una y otra vez si el procesamiento del mensaje continúa y, por lo tanto, no se llamaCommit
.Consume
) usandoCommit
después de que ya tieneSubscribe
un tema. Kafka (como en la biblioteca del cliente) detrás de la escena mantiene todos los desplazamientos que ha enviado a la aplicaciónConsume
y los enviará linealmente. Entonces, para volver a procesar un mensaje como en un escenario de falla, debe rastrearlo en su código y buscar compensarlo y comenzar a procesar el mensaje y también debe saber qué omitir si ya se ha procesado en solicitudes anteriores. No estoy familiarizado con la biblioteca .net, pero realmente no debería importar, ya que este es el diseño kafka.Respuestas:
Lo siento, no puedo agregar comentarios todavía. El consumidor de Kafka consume el mensaje en lotes, por lo que tal vez aún repita el lote previamente captado por el hilo de fondo .
Puede verificar si su consumidor realmente comete una compensación o no con kafka util
kafka-consumer-groups.sh
fuente
Es posible que desee tener una lógica de reintento para procesar cada uno de sus mensajes por un número fijo de veces, como por ejemplo 5. Si no tiene éxito durante estos 5 reintentos, es posible que desee agregar este mensaje a otro tema para manejar todos mensajes fallidos que tienen prioridad sobre su tema real. O es posible que desee agregar el mensaje fallido al mismo tema para que sea recogido más tarde una vez que se hayan consumido todos esos otros mensajes.
Si el procesamiento de cualquier mensaje es exitoso dentro de esos 5 reintentos, puede pasar al siguiente mensaje en la cola.
fuente