Envío String-messages a Kafka V. 0.8 con la API de Java Producer. Si el tamaño del mensaje es de aproximadamente 15 MB, obtengo un archivo MessageSizeTooLargeException
. Intenté establecerlo message.max.bytes
en 40 MB, pero todavía obtengo la excepción. Los pequeños mensajes funcionaron sin problemas.
(La excepción aparece en el productor, no tengo un consumidor en esta aplicación).
¿Qué puedo hacer para deshacerme de esta excepción?
Mi ejemplo de configuración de productor
private ProducerConfig kafkaConfig() {
Properties props = new Properties();
props.put("metadata.broker.list", BROKERS);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("message.max.bytes", "" + 1024 * 1024 * 40);
return new ProducerConfig(props);
}
Registro de errores:
4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224]
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
java
apache-kafka
Sonson123
fuente
fuente
Respuestas:
Necesita ajustar tres (o cuatro) propiedades:
fetch.message.max.bytes
esto determinará el tamaño más grande de un mensaje que puede obtener el consumidor.replica.fetch.max.bytes
esto permitirá que las réplicas de los intermediarios envíen mensajes dentro del clúster y se asegure de que los mensajes se replican correctamente. Si es demasiado pequeño, el mensaje nunca se replicará y, por lo tanto, el consumidor nunca verá el mensaje porque el mensaje nunca se confirmará (se replicará por completo).message.max.bytes
este es el tamaño más grande del mensaje que puede recibir el corredor de un productor.max.message.bytes
este es el tamaño más grande del mensaje que el corredor permitirá que se agregue al tema. Este tamaño está validado antes de la compresión. (El valor predeterminado es el del corredormessage.max.bytes
).Descubrí por las malas el número 2: no recibe NINGUNA excepción, mensaje o advertencia de Kafka, así que asegúrese de considerar esto cuando envíe mensajes grandes.
fuente
message.max.bytes
en el código fuente. Pero tengo que establecer estos valores en la configuración del servidor Kafkaconfig/server.properties
. Ahora también funcionan los mensajes más grandes :).fetch.message.max.bytes
memoria para CADA partición. Esto significa que si usa una gran cantidad parafetch.message.max.bytes
combinar con una gran cantidad de particiones, consumirá mucha memoria. De hecho, dado que el proceso de replicación entre los corredores también es un consumidor especializado, esto también consumirá memoria en los corredores.max.message.bytes
configuración por tema que puede ser más baja que la del corredormessage.max.bytes
./.*fetch.*bytes/
no parecen ser límites estrictos: "Este no es un máximo absoluto, si es [...] mayor que este valor, el lote récord todavía se devolverá para garantizar que se pueda avanzar ".Se requieren cambios menores para Kafka 0.10 y el nuevo consumidor en comparación con la respuesta de laugh_man :
message.max.bytes
yreplica.fetch.max.bytes
.message.max.bytes
tiene que ser igual o menor (*) quereplica.fetch.max.bytes
.max.request.size
para enviar el mensaje más grande.max.partition.fetch.bytes
para recibir mensajes más grandes.(*) Lea los comentarios para conocer más sobre
message.max.bytes
<=replica.fetch.max.bytes
fuente
message.max.bytes
necesita ser más pequeño quereplica.fetch.max.bytes
?replica.fetch.max.bytes
debería ser estrictamente más grande quemessage.max.bytes
. Un empleado de Confluent confirmó hoy lo que sospechaba: que las dos cantidades pueden, de hecho, ser iguales.message.max.bytes<replica.fetch.max.bytes
omessage.max.bytes=replica.fetch.max.bytes
@Kostas?Debe anular las siguientes propiedades:
Configuraciones del agente ($ KAFKA_HOME / config / server.properties)
Consumer Configs ($ KAFKA_HOME / config / consumer.properties)
Este paso no funcionó para mí. Lo agregué a la aplicación del consumidor y estaba funcionando bien
Reinicie el servidor.
consulte esta documentación para obtener más información: http://kafka.apache.org/08/configuration.html
fuente
La idea es enviar el mismo tamaño de mensaje desde Kafka Producer a Kafka Broker y luego ser recibido por Kafka Consumer ie
Productor de Kafka -> Agente de Kafka -> Consumidor de Kafka
Supongamos que si el requisito es enviar 15 MB de mensaje, entonces el productor , el corredor y el consumidor , los tres, deben estar sincronizados.
Kafka Producer envía 15 MB -> Kafka Broker permite / almacena 15 MB -> Kafka Consumer recibe 15 MB
Por tanto, el ajuste debería ser:
a) en Broker:
b) sobre el consumidor:
fuente
Una cosa clave para recordar es que el
message.max.bytes
atributo debe estar sincronizado con lafetch.message.max.bytes
propiedad del consumidor . el tamaño de recuperación debe ser al menos tan grande como el tamaño máximo del mensaje, de lo contrario, podría haber una situación en la que los productores puedan enviar mensajes más grandes de lo que el consumidor puede consumir / recuperar. Puede que valga la pena echarle un vistazo.¿Qué versión de Kafka estás usando? También proporcione algunos detalles más del seguimiento que está obteniendo. ¿hay algo como ...
payload size of xxxx larger than 1000000
en el registro?fuente
La respuesta de @laughing_man es bastante precisa. Pero aún así, quería dar una recomendación que aprendí del experto en Kafka Stephane Maarek de Quora.
Kafka no está diseñado para manejar mensajes grandes.
Su API debe usar almacenamiento en la nube (Ex AWS S3) y simplemente enviar a Kafka o cualquier agente de mensajes una referencia de S3. Debe encontrar un lugar para conservar sus datos, tal vez sea una unidad de red, tal vez sea lo que sea, pero no debería ser un intermediario de mensajes.
Ahora, si no quiere ir con la solución anterior
El tamaño máximo del mensaje es 1 MB (la configuración en sus corredores se llama
message.max.bytes
) Apache Kafka . Si realmente lo necesitara con urgencia, podría aumentar ese tamaño y asegurarse de aumentar los búferes de red para sus productores y consumidores.Y si realmente le importa dividir su mensaje, asegúrese de que cada división de mensaje tenga exactamente la misma clave para que se envíe a la misma partición, y el contenido de su mensaje debe informar una "identificación de parte" para que su consumidor pueda reconstruir completamente el mensaje .
También puede explorar la compresión, si su mensaje está basado en texto (compresión gzip, snappy, lz4), lo que puede reducir el tamaño de los datos, pero no mágicamente.
Nuevamente, debe usar un sistema externo para almacenar esos datos y simplemente enviar una referencia externa a Kafka. Esa es una arquitectura muy común, y una con la que debe ir y es ampliamente aceptada.
Tenga esto en cuenta que Kafka funciona mejor solo si los mensajes son enormes en cantidad pero no en tamaño.
Fuente: https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka
fuente
Para las personas que usan landoop kafka: puede pasar los valores de configuración en las variables de entorno como:
Y si está usando rdkafka, pase el message.max.bytes en la configuración del productor como:
Del mismo modo, para el consumidor,
fuente