¿Hay alguna manera de purgar el tema en kafka?
Introduje un mensaje que era demasiado grande en un tema de mensaje kafka en mi máquina local, ahora recibo un error:
kafka.common.InvalidMessageSizeException: invalid message size
Aumentar el fetch.size
no es ideal aquí, porque en realidad no quiero aceptar mensajes tan grandes.
apache-kafka
purge
Peter Klipfel
fuente
fuente
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
--delete-config retention.ms
e.g. kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000
Esto también le permite verificar el período de retención actual, por ejemplo, kafka-configs --zookeeper <zkhost>: 2181 --describe --entity-type topics --entity-name <topic name>Para purgar la cola, puede eliminar el tema:
luego vuelva a crearlo:
fuente
delete.topic.enable=true
en el archivoconfig/server.properties
, como dice la advertencia impresa por el comando mencionadoNote: This will have no impact if delete.topic.enable is not set to true.
Estos son los pasos que sigo para eliminar un tema llamado
MyTopic
:rm -rf /tmp/kafka-logs/MyTopic-0
. Repita para otras particiones y todas las réplicaszkCli.sh
luegormr /brokers/MyTopic
Si pierde el paso 3, Apache Kafka continuará informando que el tema está presente (por ejemplo, si se ejecuta
kafka-list-topic.sh
).Probado con Apache Kafka 0.8.0.
fuente
./zookeeper-shell.sh localhost:2181
y./kafka-topics.sh --list --zookeeper localhost:2181
zookeeper-client
en lugar dezkCli.sh
(tratado en Cloudera CDH5)Si bien la respuesta aceptada es correcta, ese método ha quedado en desuso. La configuración del tema ahora debe hacerse a través de
kafka-configs
.Las configuraciones establecidas a través de este método se pueden mostrar con el comando
fuente
kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --delete-config retention.ms --entity-name MyTopic
Probado en Kafka 0.8.2, para el ejemplo de inicio rápido: Primero, agregue una línea al archivo server.properties en la carpeta config:
entonces, puedes ejecutar este comando:
fuente
De kafka 1.1
Purgar un tema
espere 1 minuto, para asegurarse de que kafka purgue el tema, elimine la configuración y luego vaya al valor predeterminado
fuente
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --add-config rentention.ms=100
kafka no tiene un método directo para el tema de purga / limpieza (Colas), pero puede hacerlo eliminando ese tema y recreándolo.
primero asegúrese de que el archivo sever.properties tenga y si no, agregue
delete.topic.enable=true
luego, Eliminar tema
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic
luego créelo nuevamente.
fuente
A veces, si tiene un clúster saturado (demasiadas particiones, o usa datos de temas cifrados, o usa SSL, o el controlador está en un nodo defectuoso, o la conexión es escasa, tomará mucho tiempo purgar dicho tema .
Sigo estos pasos, especialmente si estás usando Avro.
1: Ejecutar con herramientas kafka:
2: Ejecutar en el nodo de registro de esquema:
kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning
3: Establezca la retención del tema nuevamente a la configuración original, una vez que el tema esté vacío.
Espero que esto ayude a alguien, ya que no se anuncia fácilmente.
fuente
kafka-avro-console-consumer
no es necesarioACTUALIZACIÓN: Esta respuesta es relevante para Kafka 0.6. Para Kafka 0.8 y posteriores, ver la respuesta de @Patrick.
Sí, detenga kafka y elimine manualmente todos los archivos del subdirectorio correspondiente (es fácil encontrarlo en el directorio de datos de kafka). Después de reiniciar kafka, el tema estará vacío.
fuente
El enfoque más simple es establecer que la fecha de los archivos de registro individuales sea anterior al período de retención. Luego, el corredor debe limpiarlos y eliminarlos en unos segundos. Esto ofrece varias ventajas:
En mi experiencia con Kafka 0.7.x, eliminar los archivos de registro y reiniciar el corredor podría generar excepciones de compensación no válidas para ciertos consumidores. Esto sucedería porque el intermediario reinicia las compensaciones en cero (en ausencia de archivos de registro existentes), y un consumidor que anteriormente consumía el tema se volvería a conectar para solicitar un desplazamiento específico [una vez válido]. Si este desplazamiento cae fuera de los límites de los nuevos registros de temas, entonces no hay daño y el consumidor continúa al principio o al final. Pero, si el desplazamiento se encuentra dentro de los límites de los nuevos registros de temas, el intermediario intenta recuperar el conjunto de mensajes pero falla porque el desplazamiento no se alinea con un mensaje real.
Esto podría mitigarse eliminando también las compensaciones del consumidor en el cuidador del zoológico para ese tema. Pero si no necesita un tema virgen y solo desea eliminar el contenido existente, simplemente 'tocar' algunos registros de temas es mucho más fácil y más confiable, que detener a los intermediarios, eliminar registros de temas y borrar ciertos nodos del cuidador del zoológico .
fuente
El consejo de Thomas es excelente, pero desafortunadamente
zkCli
en las versiones antiguas de Zookeeper (por ejemplo, 3.3.6) no parece ser compatiblermr
. Por ejemplo, compare la implementación de la línea de comandos en el Zookeeper moderno con la versión 3.3 .Si se enfrenta a una versión anterior de Zookeeper, una solución es utilizar una biblioteca cliente como zc.zk para Python. Para las personas que no están familiarizadas con Python, debe instalarlo utilizando pip o easy_install . Luego inicia un shell de Python (
python
) y puedes hacer:o incluso
si quieres eliminar todos los temas de Kafka.
fuente
Para limpiar todos los mensajes de un tema en particular usando su grupo de aplicaciones (GroupName debe ser el mismo que el nombre del grupo kafka de la aplicación).
./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group
fuente
Siguiendo la respuesta de @steven appleyard, ejecuté los siguientes comandos en Kafka 2.2.0 y funcionaron para mí.
fuente
Aquí hay muchas respuestas geniales, pero entre ellas, no encontré ninguna sobre Docker. Pasé algún tiempo para darme cuenta de que usar el contenedor de intermediarios es incorrecto para este caso (¡¡¡¡¡¡¡¡¡¡obviamente) !!!
y debería haber usado en
zookeeper:2181
lugar de--zookeeper localhost:2181
según mi archivo de redacciónel comando correcto sería
Espero que ahorre tiempo a alguien.
Además, tenga en cuenta que los mensajes no se eliminarán de inmediato y sucederá cuando se cierre el segmento del registro.
fuente
localhost:2181
... Por ejemplo, está malinterpretando las funciones de red de Docker. Además, no todos los contenedores de Zookeeper tienenkafka-topics
, por lo que es mejor no usarlo de esa manera. Las últimas instalaciones de Kafka permiten--bootstrap-servers
alterar un tema en lugar de--zookeeper
you can use
--zookeeper zookeeper: 2181` del contenedor Kafka es mi punto. O incluso extraiga la línea Zookeeper del archivo server.propertiesNo se pudo agregar como comentario debido al tamaño: no estoy seguro de si esto es cierto, además de actualizar retención.ms y retención.bytes, pero noté que la política de limpieza del tema debería ser "eliminar" (predeterminado), si es "compacto", va a retener los mensajes por más tiempo, es decir, si es "compacto", también debe especificar delete.retention.ms .
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1
También tuve que monitorear las compensaciones más tempranas / más recientes deben ser las mismas para confirmar que esto sucedió con éxito, también puede verificar du -h / tmp / kafka-logs / test-topic-3-100- *
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}' 26599762
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}' 26599762
El otro problema es que primero debe obtener la configuración actual para que recuerde revertir después de que la eliminación se haya realizado correctamente:
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
fuente
Otro enfoque, más bien manual, para purgar un tema es:
en los corredores:
sudo service kafka stop
sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*
en cuidador del zoológico:
sudo /usr/lib/zookeeper/bin/zkCli.sh
rmr /brokers/topic/<some_topic_name>
en los corredores de nuevo:
sudo service kafka start
fuente
Esto debería dar
retention.ms
configurado. Luego puede usar el comando alter anterior para cambiar a 1 segundo (y luego volver al valor predeterminado)fuente
Desde Java, usando el nuevo en
AdminZkClient
lugar del obsoletoAdminUtils
:fuente
AdminClient
oKafkaAdminClient