Estoy usando apache kafka para enviar mensajes. Implementé el productor y el consumidor en Java. ¿Cómo podemos obtener la cantidad de mensajes en un tema?
java
messages
apache-kafka
Chetan
fuente
fuente
No es Java, pero puede ser útil.
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <broker>: <port> --topic <topic-name> --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'
fuente
bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -1 | awk -F ":" '{sum += $3} END {print sum}' 13818663 bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -2 | awk -F ":" '{sum += $3} END {print sum}' 12434609
¿Y luego la diferencia devuelve mensajes pendientes reales en el tema? ¿Estoy en lo correcto?De hecho, lo uso para comparar mi POC. El elemento que desea utilizar ConsumerOffsetChecker. Puede ejecutarlo usando el script bash como se muestra a continuación.
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group testgroup
Y a continuación se muestra el resultado: como puede ver en el cuadro rojo, 999 es el número de mensaje actualmente en el tema.
Actualización: ConsumerOffsetChecker está en desuso desde 0.10.0, es posible que desee comenzar a usar ConsumerGroupCommand.
fuente
A veces, el interés está en conocer la cantidad de mensajes en cada partición, por ejemplo, cuando se prueba un particionador personalizado. Los pasos siguientes han sido probados para trabajar con Kafka 0.10.2.1-2 de Confluent 3.2. Dado un tema de Kafka
kt
y la siguiente línea de comandos:$ kafka-run-class kafka.tools.GetOffsetShell \ --broker-list host01:9092,host02:9092,host02:9092 --topic kt
Eso imprime la salida de muestra que muestra el recuento de mensajes en las tres particiones:
kt:2:6138 kt:1:6123 kt:0:6137
El número de líneas puede ser más o menos dependiendo del número de particiones del tema.
fuente
Dado
ConsumerOffsetChecker
que ya no es compatible, puede usar este comando para verificar todos los mensajes en el tema:bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \ --group my-group \ --bootstrap-server localhost:9092 \ --describe
¿Dónde
LAG
está el recuento de mensajes en la partición de tema?También puedes intentar usar kafkacat . Este es un proyecto de código abierto que puede ayudarlo a leer mensajes de un tema y partición y los imprime en stdout. Aquí hay una muestra que lee los últimos 10 mensajes del
sample-kafka-topic
tema y luego sale:kafkacat -b localhost:9092 -t sample-kafka-topic -p 0 -o -10 -e
fuente
Utilice https://prestodb.io/docs/current/connector/kafka-tutorial.html
Un motor súper SQL, proporcionado por Facebook, que se conecta a varias fuentes de datos (Cassandra, Kafka, JMX, Redis ...).
PrestoDB se ejecuta como un servidor con trabajadores opcionales (hay un modo independiente sin trabajadores adicionales), luego usa un pequeño JAR ejecutable (llamado presto CLI) para realizar consultas.
Una vez que haya configurado bien el servidor de Presto, puede usar SQL tradicional:
SELECT count(*) FROM TOPIC_NAME;
fuente
Comando de Apache Kafka para obtener mensajes no manejados en todas las particiones de un tema:
kafka-run-class kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group test_group
Huellas dactilares:
Group Topic Pid Offset logSize Lag Owner test_group test 0 11051 11053 2 none test_group test 1 10810 10812 2 none test_group test 2 11027 11028 1 none
La columna 6 son los mensajes no tratados. Súmalos así:
kafka-run-class kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group test_group 2>/dev/null | awk 'NR>1 {sum += $6} END {print sum}'
awk lee las filas, salta la línea del encabezado y suma la sexta columna y al final imprime la suma.
Huellas dactilares
5
fuente
Para obtener todos los mensajes almacenados para el tema, puede buscar el consumidor al principio y al final de la secuencia para cada partición y sumar los resultados.
List<TopicPartition> partitions = consumer.partitionsFor(topic).stream() .map(p -> new TopicPartition(topic, p.partition())) .collect(Collectors.toList()); consumer.assign(partitions); consumer.seekToEnd(Collections.emptySet()); Map<TopicPartition, Long> endPartitions = partitions.stream() .collect(Collectors.toMap(Function.identity(), consumer::position)); consumer.seekToBeginning(Collections.emptySet()); System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());
fuente
Ejecute lo siguiente (asumiendo que
kafka-console-consumer.sh
está en la ruta):kafka-console-consumer.sh --from-beginning \ --bootstrap-server yourbroker:9092 --property print.key=true \ --property print.value=false --property print.partition \ --topic yourtopic --timeout-ms 5000 | tail -n 10|grep "Processed a total of"
fuente
--new-consumer
Con el cliente Java de Kafka 2.11-1.0.0, puede hacer lo siguiente:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test")); while(true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); // after each message, query the number of messages of the topic Set<TopicPartition> partitions = consumer.assignment(); Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions); for(TopicPartition partition : offsets.keySet()) { System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition)); } } }
La salida es algo como esto:
offset = 10, key = null, value = un partition test is at 13 offset = 11, key = null, value = deux partition test is at 13 offset = 12, key = null, value = trois partition test is at 13
fuente
seekToEnd(..)
yseekToBeginning(..)
métodos que cambian el estado delconsumer
.En las versiones más recientes de Kafka Manager, hay una columna titulada Compensaciones recientes sumadas .
fuente
Tenía esta misma pregunta y así es como lo estoy haciendo, de un KafkaConsumer, en Kotlin:
Código muy aproximado, ya que acabo de hacer que esto funcione, pero básicamente desea restar el desplazamiento inicial del tema del desplazamiento final y este será el recuento de mensajes actual para el tema.
No puede simplemente confiar en el desplazamiento final debido a otras configuraciones (política de limpieza, retención-ms, etc.) que pueden terminar causando la eliminación de mensajes antiguos de su tema. Las compensaciones solo se "mueven" hacia adelante, por lo que es la compensación inicial la que se acercará más a la compensación final (o eventualmente al mismo valor, si el tema no contiene ningún mensaje en este momento).
Básicamente, el desplazamiento final representa el número total de mensajes que pasaron por ese tema, y la diferencia entre los dos representa el número de mensajes que contiene el tema en este momento.
fuente
Extractos de documentos de Kafka
Deprecaciones en 0.9.0.0
Kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) ha quedado obsoleto. En el futuro, utilice kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) para esta funcionalidad.
Estoy ejecutando el agente Kafka con SSL habilitado tanto para el servidor como para el cliente. Debajo del comando que uso
kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x
donde / tmp / ssl_config es el siguiente
fuente
Si tiene acceso a la interfaz JMX del servidor, las compensaciones de inicio y finalización están presentes en:
(necesita reemplazar
TOPICNAME
&PARTITIONNUMBER
). Tenga en cuenta que debe verificar cada una de las réplicas de una partición determinada, o debe averiguar cuál de los corredores es el líder para una partición determinada (y esto puede cambiar con el tiempo).Alternativamente, puede utilizar los métodos Kafka Consumer
beginningOffsets
yendOffsets
.fuente
No he probado esto mismo, pero parece tener sentido.
También puede usar
kafka.tools.ConsumerOffsetChecker
( fuente ).fuente
La forma más sencilla que he encontrado es usar la API REST de Kafdrop
/topic/topicName
y especificar la clave:"Accept"
/ valor:"application/json"
encabezado para obtener una respuesta JSON.Esto está documentado aquí .
fuente
Puede usar kafkatool . Consulte este enlace -> http://www.kafkatool.com/download.html
fuente