Java, cómo obtener la cantidad de mensajes en un tema en apache kafka

93

Estoy usando apache kafka para enviar mensajes. Implementé el productor y el consumidor en Java. ¿Cómo podemos obtener la cantidad de mensajes en un tema?

Chetan
fuente

Respuestas:

34

La única forma que se le ocurre para esto desde el punto de vista del consumidor es consumir los mensajes y contarlos.

El corredor de Kafka expone los contadores JMX para la cantidad de mensajes recibidos desde el inicio, pero no puede saber cuántos de ellos ya se han depurado.

En la mayoría de los escenarios comunes, los mensajes en Kafka se ven mejor como un flujo infinito y obtener un valor discreto de cuántos se mantienen actualmente en el disco no es relevante. Además, las cosas se complican más cuando se trata de un grupo de agentes que tienen un subconjunto de mensajes en un tema.

Lundahl
fuente
Vea mi respuesta stackoverflow.com/a/47313863/2017567 . El cliente Java Kafka permite obtener esa información.
Christophe Quintard
96

No es Java, pero puede ser útil.

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell 
  --broker-list <broker>:  <port> 
  --topic <topic-name> --time -1 --offsets 1 
  | awk -F  ":" '{sum += $3} END {print sum}'
ssemichev
fuente
12
¿No debería ser esta la diferencia entre el primer y el último desplazamiento por suma de partición? bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -1 | awk -F ":" '{sum += $3} END {print sum}' 13818663 bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -2 | awk -F ":" '{sum += $3} END {print sum}' 12434609 ¿Y luego la diferencia devuelve mensajes pendientes reales en el tema? ¿Estoy en lo correcto?
kisna
1
Sí, eso es verdad. Debe calcular una diferencia si las primeras compensaciones no son iguales a cero.
ssemichev
Eso es lo que pensé :).
kisna
1
¿Hay ALGUNA forma de usar eso como una API y así dentro de un código (JAVA, Scala o Python)?
salvob
Aquí hay una combinación de mi código y el código de Kafka. Puede ser útil. Lo utilicé para la transmisión de Spark - Kafka integración KafkaClient gist.github.com/ssemichev/c2d94dce7ad65339c9637e1b461f86cf KafkaCluster gist.github.com/ssemichev/fa3605c7b10cb6c7b9c8ab54ffbc5865
ssemichev
17

De hecho, lo uso para comparar mi POC. El elemento que desea utilizar ConsumerOffsetChecker. Puede ejecutarlo usando el script bash como se muestra a continuación.

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker  --topic test --zookeeper localhost:2181 --group testgroup

Y a continuación se muestra el resultado: ingrese la descripción de la imagen aquí como puede ver en el cuadro rojo, 999 es el número de mensaje actualmente en el tema.

Actualización: ConsumerOffsetChecker está en desuso desde 0.10.0, es posible que desee comenzar a usar ConsumerGroupCommand.

Rudy
fuente
1
Tenga en cuenta que ConsumerOffsetChecker está en desuso y se eliminará en las versiones posteriores a 0.9.0. En su lugar, utilice ConsumerGroupCommand. (kafka.tools.ConsumerOffsetChecker $)
Szymon Sadło
1
Sí, eso es lo que dije.
Rudy
Tu última oración no es precisa. El comando anterior todavía funciona en 0.10.0.1 y la advertencia es la misma que mi comentario anterior.
Szymon Sadło
14

A veces, el interés está en conocer la cantidad de mensajes en cada partición, por ejemplo, cuando se prueba un particionador personalizado. Los pasos siguientes han sido probados para trabajar con Kafka 0.10.2.1-2 de Confluent 3.2. Dado un tema de Kafka kty la siguiente línea de comandos:

$ kafka-run-class kafka.tools.GetOffsetShell \
  --broker-list host01:9092,host02:9092,host02:9092 --topic kt

Eso imprime la salida de muestra que muestra el recuento de mensajes en las tres particiones:

kt:2:6138
kt:1:6123
kt:0:6137

El número de líneas puede ser más o menos dependiendo del número de particiones del tema.

pdp
fuente
4
Si la compactación de registros está habilitada, es posible que la suma de las compensaciones de las particiones no proporcione el recuento exacto de mensajes en el tema.
14

Dado ConsumerOffsetCheckerque ya no es compatible, puede usar este comando para verificar todos los mensajes en el tema:

bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
    --group my-group \
    --bootstrap-server localhost:9092 \
    --describe

¿Dónde LAGestá el recuento de mensajes en la partición de tema?

ingrese la descripción de la imagen aquí

También puedes intentar usar kafkacat . Este es un proyecto de código abierto que puede ayudarlo a leer mensajes de un tema y partición y los imprime en stdout. Aquí hay una muestra que lee los últimos 10 mensajes del sample-kafka-topictema y luego sale:

kafkacat -b localhost:9092 -t sample-kafka-topic -p 0 -o -10 -e
ruslangm
fuente
11

Utilice https://prestodb.io/docs/current/connector/kafka-tutorial.html

Un motor súper SQL, proporcionado por Facebook, que se conecta a varias fuentes de datos (Cassandra, Kafka, JMX, Redis ...).

PrestoDB se ejecuta como un servidor con trabajadores opcionales (hay un modo independiente sin trabajadores adicionales), luego usa un pequeño JAR ejecutable (llamado presto CLI) para realizar consultas.

Una vez que haya configurado bien el servidor de Presto, puede usar SQL tradicional:

SELECT count(*) FROM TOPIC_NAME;
Thomas Decaux
fuente
esta herramienta es buena, pero si no funciona si su tema tiene más de 2 puntos.
armandfp
7

Comando de Apache Kafka para obtener mensajes no manejados en todas las particiones de un tema:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group

Huellas dactilares:

Group      Topic        Pid Offset          logSize         Lag             Owner
test_group test         0   11051           11053           2               none
test_group test         1   10810           10812           2               none
test_group test         2   11027           11028           1               none

La columna 6 son los mensajes no tratados. Súmalos así:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group 2>/dev/null | awk 'NR>1 {sum += $6} 
    END {print sum}'

awk lee las filas, salta la línea del encabezado y suma la sexta columna y al final imprime la suma.

Huellas dactilares

5
Eric Leschinski
fuente
5

Para obtener todos los mensajes almacenados para el tema, puede buscar el consumidor al principio y al final de la secuencia para cada partición y sumar los resultados.

List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
        .map(p -> new TopicPartition(topic, p.partition()))
        .collect(Collectors.toList());
    consumer.assign(partitions); 
    consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream()
        .collect(Collectors.toMap(Function.identity(), consumer::position));
    consumer.seekToBeginning(Collections.emptySet());
System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());
AutomatedMike
fuente
1
Por cierto, si tiene la compactación activada, es posible que haya espacios en el flujo, por lo que la cantidad real de mensajes puede ser menor que el total calculado aquí. Para obtener un total exacto, tendrá que reproducir los mensajes y contarlos.
AutomatedMike
5

Ejecute lo siguiente (asumiendo que kafka-console-consumer.shestá en la ruta):

kafka-console-consumer.sh  --from-beginning \
--bootstrap-server yourbroker:9092 --property print.key=true  \
--property print.value=false --property print.partition \
--topic yourtopic --timeout-ms 5000 | tail -n 10|grep "Processed a total of"
Borislav Markov
fuente
Nota: --new-consumer
eliminé
3

Con el cliente Java de Kafka 2.11-1.0.0, puede hacer lo siguiente:

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("test"));
    while(true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

            // after each message, query the number of messages of the topic
            Set<TopicPartition> partitions = consumer.assignment();
            Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
            for(TopicPartition partition : offsets.keySet()) {
                System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition));
            }
        }
    }

La salida es algo como esto:

offset = 10, key = null, value = un
partition test is at 13
offset = 11, key = null, value = deux
partition test is at 13
offset = 12, key = null, value = trois
partition test is at 13
Christophe Quintard
fuente
Yo prefiero su respuesta a la comparación de la respuesta @AutomatedMike ya que su respuesta no pierde el tiempo con seekToEnd(..)y seekToBeginning(..)métodos que cambian el estado del consumer.
adaslaw
2

En las versiones más recientes de Kafka Manager, hay una columna titulada Compensaciones recientes sumadas .

ingrese la descripción de la imagen aquí

f01
fuente
2

Tenía esta misma pregunta y así es como lo estoy haciendo, de un KafkaConsumer, en Kotlin:

val messageCount = consumer.listTopics().entries.filter { it.key == topicName }
    .map {
        it.value.map { topicInfo -> TopicPartition(topicInfo.topic(), topicInfo.partition()) }
    }.map { consumer.endOffsets(it).values.sum() - consumer.beginningOffsets(it).values.sum()}
    .first()

Código muy aproximado, ya que acabo de hacer que esto funcione, pero básicamente desea restar el desplazamiento inicial del tema del desplazamiento final y este será el recuento de mensajes actual para el tema.

No puede simplemente confiar en el desplazamiento final debido a otras configuraciones (política de limpieza, retención-ms, etc.) que pueden terminar causando la eliminación de mensajes antiguos de su tema. Las compensaciones solo se "mueven" hacia adelante, por lo que es la compensación inicial la que se acercará más a la compensación final (o eventualmente al mismo valor, si el tema no contiene ningún mensaje en este momento).

Básicamente, el desplazamiento final representa el número total de mensajes que pasaron por ese tema, y ​​la diferencia entre los dos representa el número de mensajes que contiene el tema en este momento.

LeYAUable
fuente
0

Extractos de documentos de Kafka

Deprecaciones en 0.9.0.0

Kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) ha quedado obsoleto. En el futuro, utilice kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) para esta funcionalidad.

Estoy ejecutando el agente Kafka con SSL habilitado tanto para el servidor como para el cliente. Debajo del comando que uso

kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x

donde / tmp / ssl_config es el siguiente

security.protocol=SSL
ssl.truststore.location=truststore_file_path.jks
ssl.truststore.password=truststore_password
ssl.keystore.location=keystore_file_path.jks
ssl.keystore.password=keystore_password
ssl.key.password=key_password
SR Bandi
fuente
0

Si tiene acceso a la interfaz JMX del servidor, las compensaciones de inicio y finalización están presentes en:

kafka.log:type=Log,name=LogStartOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
kafka.log:type=Log,name=LogEndOffset,topic=TOPICNAME,partition=PARTITIONNUMBER

(necesita reemplazar TOPICNAME& PARTITIONNUMBER). Tenga en cuenta que debe verificar cada una de las réplicas de una partición determinada, o debe averiguar cuál de los corredores es el líder para una partición determinada (y esto puede cambiar con el tiempo).

Alternativamente, puede utilizar los métodos Kafka ConsumerbeginningOffsets y endOffsets.

Adam Kotwasinski
fuente
-1

No he probado esto mismo, pero parece tener sentido.

También puede usar kafka.tools.ConsumerOffsetChecker( fuente ).

hba
fuente
-1

La forma más sencilla que he encontrado es usar la API REST de Kafdrop /topic/topicNamey especificar la clave: "Accept"/ valor: "application/json"encabezado para obtener una respuesta JSON.

Esto está documentado aquí .

Sebastián Sturza
fuente
-1

Puede usar kafkatool . Consulte este enlace -> http://www.kafkatool.com/download.html

Kafka Tool es una aplicación GUI para administrar y usar clústeres de Apache Kafka. Proporciona una interfaz de usuario intuitiva que permite ver rápidamente los objetos dentro de un clúster de Kafka, así como los mensajes almacenados en los temas del clúster.ingrese la descripción de la imagen aquí

Vinto
fuente