¿Qué determina la compensación del consumidor de Kafka?

170

Soy relativamente nuevo en Kafka. He experimentado un poco con él, pero algunas cosas no me quedan claras con respecto a la compensación del consumidor. Por lo que he entendido hasta ahora, cuando un consumidor comienza, el desplazamiento desde el que comenzará a leer está determinado por la configuración auto.offset.reset(corríjame si me equivoco).

Ahora, por ejemplo, digamos que hay 10 mensajes (compensaciones de 0 a 9) en el tema, y ​​un consumidor consumió 5 de ellos antes de que cayera (o antes de que matara al consumidor). Luego diga que reinicio ese proceso del consumidor. Mis preguntas son:

  1. Si auto.offset.resetse establece en smallest, ¿siempre comenzará a consumir desde el desplazamiento 0?

  2. Si auto.offset.resetse establece en largest, ¿comenzará a consumir desde el desplazamiento 5?

  3. ¿El comportamiento con respecto a este tipo de escenario es siempre determinista?

No dude en comentar si algo en mi pregunta no está claro. Gracias por adelantado.

Asif Iqbal
fuente

Respuestas:

260

Es un poco más complejo de lo que describiste.
La auto.offset.resetconfiguración se activa SOLO si su grupo de consumidores no tiene una compensación válida confirmada en algún lugar (2 almacenes de compensación compatibles ahora son Kafka y Zookeeper), y también depende del tipo de consumidor que utilice.

Si utiliza un consumidor de Java de alto nivel, imagine los siguientes escenarios:

  1. Tiene un consumidor en un grupo de consumidores group1que ha consumido 5 mensajes y murió. La próxima vez que inicie este consumidor, ni siquiera usará esa auto.offset.resetconfiguración y continuará desde el lugar donde murió porque solo obtendrá el desplazamiento almacenado del almacenamiento de compensación (Kafka o ZK como mencioné).

  2. Tiene mensajes en un tema (como lo describió) y comienza un consumidor en un nuevo grupo de consumidores group2. No hay ningún desplazamiento almacenado en ningún lado y esta vez la auto.offset.resetconfiguración decidirá si comenzar desde el principio del tema ( earliest) o desde el final del tema ( latest)

Una cosa más que afecta a qué valor de compensación corresponderá earliesty latestconfigurará la política de retención de registros. Imagine que tiene un tema con retención configurada en 1 hora. Produce 5 mensajes y luego, una hora más tarde, publica 5 mensajes más. El latestdesplazamiento seguirá siendo el mismo que en el ejemplo anterior, pero earliestno será posible 0porque Kafka ya eliminará estos mensajes y, por lo tanto, el desplazamiento disponible más temprano será 5.

Todo lo mencionado anteriormente no está relacionado SimpleConsumery cada vez que lo ejecute, decidirá dónde comenzar a usar la auto.offset.resetconfiguración.

Si utiliza la versión Kafka mayores de 0,9, usted tiene que reemplazar earliest, latestcon smallest, largest.

serejja
fuente
3
Muchas gracias por la respuesta. Entonces, en cuanto al consumidor de alto nivel, una vez que un consumidor tiene algo comprometido (ya sea en ZK o Kafka), auto.offset.reset¿no tiene ningún significado después? ¿El único significado de esa configuración es cuando no hay nada comprometido (e idealmente eso sería en la primera puesta en marcha del consumidor)?
Asif Iqbal
2
Exactamente como lo describiste
serejja
1
@serejja Hola, ¿qué tal si siempre tengo 1 consumidor por grupo, y el escenario # 1 de tu respuesta ocurre para mí? ¿Será lo mismo?
ha9u63ar
1
@ ha9u63ar no entendió bien tu pregunta. Si reinicia su consumidor en el mismo grupo, sí, no lo usará auto.offset.resety continuará desde el desplazamiento comprometido. Si siempre usa un grupo de consumidores diferente (como generarlo al iniciar el consumidor), entonces el consumidor siempre respetaráauto.offset.reset
serejja
@serejja sí y eso no funciona para mí. ¿podría echarle un vistazo a esto ? Este es mi problema
ha9u63ar
83

Solo una actualización: desde Kafka 0.9 en adelante, Kafka está utilizando una nueva versión Java del consumidor y los nombres de los parámetros auto.offset.reset han cambiado; Del manual:

Qué hacer cuando no hay un desplazamiento inicial en Kafka o si el desplazamiento actual ya no existe en el servidor (por ejemplo, porque esos datos se han eliminado):

más temprano : restablece automáticamente el desplazamiento al primer desplazamiento

último : restablece automáticamente el desplazamiento al último desplazamiento

none : lanzar una excepción al consumidor si no se encuentra una compensación previa para el grupo de consumidores

cualquier otra cosa: lanzar una excepción al consumidor.

Pasé algún tiempo para encontrar esto después de verificar la respuesta aceptada, así que pensé que podría ser útil para la comunidad publicarla.

Zinc de Israel
fuente
9

Más aún hay compensaciones.retención.minutos. Si el tiempo desde la última confirmación es> offsets.retention.minutes, auto.offset.resettambién se activa

Sasa Ninkovic
fuente
1
¿No parece esto redundante con la retención de registros? ¿Debería la retención establecida basarse en la retención de registros?
mike01010
@ mike01010 eso es correcto. Debe basarse en la retención de registros, esa es una de las soluciones propuestas en el ticket. Prolong default value of offsets.retention.minutes to be at least twice larger than log.retention.hours. issues.apache.org/jira/browse/KAFKA-3806
saheb
Esa respuesta me asustó por un tiempo, hasta que verifiqué la documentación de offsets.retention.minutes: <b> Después de que un grupo de consumidores pierde a todos sus consumidores (es decir, queda vacío), sus compensaciones se mantendrán durante este período de retención antes de ser descartadas. </b> Por separado consumidores (mediante asignación manual), las compensaciones caducarán después del último compromiso más este período de retención. (Esto es para Kafka 2.3)
jumping_monkey