¿Cómo puedo enviar mensajes grandes con Kafka (más de 15 MB)?

118

Envío String-messages a Kafka V. 0.8 con la API de Java Producer. Si el tamaño del mensaje es de aproximadamente 15 MB, obtengo un archivo MessageSizeTooLargeException. Intenté establecerlo message.max.bytesen 40 MB, pero todavía obtengo la excepción. Los pequeños mensajes funcionaron sin problemas.

(La excepción aparece en el productor, no tengo un consumidor en esta aplicación).

¿Qué puedo hacer para deshacerme de esta excepción?

Mi ejemplo de configuración de productor

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

Registro de errores:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
Sonson123
fuente
5
Mi primer instinto sería pedirle que divida este gran mensaje en varios más pequeños: - / Supongo que esto no es posible por alguna razón, pero es posible que desee reconsiderarlo de todos modos: los mensajes enormes generalmente significan que hay un defecto de diseño en algún lugar que realmente debería arreglarse.
Aaron Digulla
1
Gracias, pero haría mi lógica mucho más compleja. ¿Por qué es una mala idea utilizar Kafka para mensajes de alrededor de 15 MB? ¿Es 1 MB el límite máximo de tamaño de mensaje que se puede utilizar? No encontré mucho sobre el límite de tamaño del mensaje en la documentación de Kafka.
Sonson123
2
Esto no tiene ninguna relación con Kafka o cualquier otro sistema de procesamiento de mensajes. Mi razonamiento: si algo sale mal con su archivo de 15 MB, limpiar el desorden después es muy costoso. Es por eso que generalmente divido archivos grandes en muchos trabajos más pequeños (que generalmente también se pueden ejecutar en paralelo).
Aaron Digulla
ha utilizado alguna compresión? ¿Podría compartir algunos detalles más? Es un poco difícil adivinar algo con una sola palabra
user2720864

Respuestas:

181

Necesita ajustar tres (o cuatro) propiedades:

  • Lado del consumidor: fetch.message.max.bytesesto determinará el tamaño más grande de un mensaje que puede obtener el consumidor.
  • Lado del intermediario: replica.fetch.max.bytesesto permitirá que las réplicas de los intermediarios envíen mensajes dentro del clúster y se asegure de que los mensajes se replican correctamente. Si es demasiado pequeño, el mensaje nunca se replicará y, por lo tanto, el consumidor nunca verá el mensaje porque el mensaje nunca se confirmará (se replicará por completo).
  • Lado del corredor: message.max.byteseste es el tamaño más grande del mensaje que puede recibir el corredor de un productor.
  • Lado del corredor (por tema): max.message.byteseste es el tamaño más grande del mensaje que el corredor permitirá que se agregue al tema. Este tamaño está validado antes de la compresión. (El valor predeterminado es el del corredor message.max.bytes).

Descubrí por las malas el número 2: no recibe NINGUNA excepción, mensaje o advertencia de Kafka, así que asegúrese de considerar esto cuando envíe mensajes grandes.

hombre riendo
fuente
3
Ok, tú y user2720864 estaban en lo correcto. Solo había configurado el message.max.bytesen el código fuente. Pero tengo que establecer estos valores en la configuración del servidor Kafka config/server.properties. Ahora también funcionan los mensajes más grandes :).
Sonson123
3
¿Existe alguna desventaja conocida al establecer estos valores demasiado altos?
Ivan Balashov
7
Si. En el lado del consumidor, asigna fetch.message.max.bytesmemoria para CADA partición. Esto significa que si usa una gran cantidad para fetch.message.max.bytescombinar con una gran cantidad de particiones, consumirá mucha memoria. De hecho, dado que el proceso de replicación entre los corredores también es un consumidor especializado, esto también consumirá memoria en los corredores.
smiling_man
3
Tenga en cuenta que también hay una max.message.bytesconfiguración por tema que puede ser más baja que la del corredor message.max.bytes.
Peter Davis
1
Según el documento oficial, los parámetros del lado del consumidor y los relacionados con la replicación entre corredores /.*fetch.*bytes/no parecen ser límites estrictos: "Este no es un máximo absoluto, si es [...] mayor que este valor, el lote récord todavía se devolverá para garantizar que se pueda avanzar ".
Bluu
56

Se requieren cambios menores para Kafka 0.10 y el nuevo consumidor en comparación con la respuesta de laugh_man :

  • Corredor: Sin cambios, aún necesita aumentar las propiedades message.max.bytesy replica.fetch.max.bytes. message.max.bytestiene que ser igual o menor (*) que replica.fetch.max.bytes.
  • Productor: Aumente max.request.sizepara enviar el mensaje más grande.
  • Consumidor: Aumente max.partition.fetch.bytespara recibir mensajes más grandes.

(*) Lea los comentarios para conocer más sobre message.max.bytes<=replica.fetch.max.bytes

Sascha Vetter
fuente
2
¿Sabes por qué message.max.bytesnecesita ser más pequeño que replica.fetch.max.bytes?
Kostas
2
" replica.fetch.max.bytes (predeterminado: 1 MB): tamaño máximo de datos que un corredor puede replicar. Debe ser mayor que message.max.bytes , o un corredor aceptará mensajes y no podrá replicarlos. posible pérdida de datos ". Fuente: handling-large-messages-kafka
Sascha Vetter
2
Gracias por responderme con un enlace. Esto también parece hacer eco de lo que sugiere la guía de Cloudera . Sin embargo, ambos son incorrectos; tenga en cuenta que no ofrecen ninguna razón técnica de por qué replica.fetch.max.bytes debería ser estrictamente más grande que message.max.bytes. Un empleado de Confluent confirmó hoy lo que sospechaba: que las dos cantidades pueden, de hecho, ser iguales.
Kostas
2
¿Hay alguna actualización sobre message.max.bytes<replica.fetch.max.byteso message.max.bytes=replica.fetch.max.bytes@Kostas?
Sascha Vetter
2
Sí, pueden ser iguales: mail-archive.com/[email protected]/msg25494.html (Ismael trabaja para Confluent)
Kostas
13

Debe anular las siguientes propiedades:

Configuraciones del agente ($ KAFKA_HOME / config / server.properties)

  • replica.fetch.max.bytes
  • message.max.bytes

Consumer Configs ($ KAFKA_HOME / config / consumer.properties)
Este paso no funcionó para mí. Lo agregué a la aplicación del consumidor y estaba funcionando bien

  • fetch.message.max.bytes

Reinicie el servidor.

consulte esta documentación para obtener más información: http://kafka.apache.org/08/configuration.html

usuario2550587
fuente
1
para el consumidor de la línea de comandos, necesito usar el indicador --fetch-size = <bytes>. No parece leer el archivo consumer.properties (kafka 0.8.1). También recomendaría activar la compresión desde el lado del productor usando la opción compressed.codec.
Ziggy Eunicien
El comentario de Ziggy funcionó para mí kafka 0.8.1.1. ¡Gracias!
James
¿Podría ser que fetch.message.max.bytes sea reemplazado por max.partition.fetch.bytes en ConsumerConfig?
s_bei
12

La idea es enviar el mismo tamaño de mensaje desde Kafka Producer a Kafka Broker y luego ser recibido por Kafka Consumer ie

Productor de Kafka -> Agente de Kafka -> Consumidor de Kafka

Supongamos que si el requisito es enviar 15 MB de mensaje, entonces el productor , el corredor y el consumidor , los tres, deben estar sincronizados.

Kafka Producer envía 15 MB -> Kafka Broker permite / almacena 15 MB -> Kafka Consumer recibe 15 MB

Por tanto, el ajuste debería ser:

a) en Broker:

message.max.bytes=15728640 
replica.fetch.max.bytes=15728640

b) sobre el consumidor:

fetch.message.max.bytes=15728640
Ravi
fuente
2
¿Podría ser que fetch.message.max.bytes sea reemplazado por max.partition.fetch.bytes en ConsumerConfig?
s_bei
7

Una cosa clave para recordar es que el message.max.bytesatributo debe estar sincronizado con la fetch.message.max.bytespropiedad del consumidor . el tamaño de recuperación debe ser al menos tan grande como el tamaño máximo del mensaje, de lo contrario, podría haber una situación en la que los productores puedan enviar mensajes más grandes de lo que el consumidor puede consumir / recuperar. Puede que valga la pena echarle un vistazo.
¿Qué versión de Kafka estás usando? También proporcione algunos detalles más del seguimiento que está obteniendo. ¿hay algo como ... payload size of xxxx larger than 1000000en el registro?

usuario2720864
fuente
1
He actualizado mi pregunta con más información: Kafka Version 2.8.0-0.8.0; ahora solo necesito al productor.
Sonson123
6

La respuesta de @laughing_man es bastante precisa. Pero aún así, quería dar una recomendación que aprendí del experto en Kafka Stephane Maarek de Quora.

Kafka no está diseñado para manejar mensajes grandes.

Su API debe usar almacenamiento en la nube (Ex AWS S3) y simplemente enviar a Kafka o cualquier agente de mensajes una referencia de S3. Debe encontrar un lugar para conservar sus datos, tal vez sea una unidad de red, tal vez sea lo que sea, pero no debería ser un intermediario de mensajes.

Ahora, si no quiere ir con la solución anterior

El tamaño máximo del mensaje es 1 MB (la configuración en sus corredores se llama message.max.bytes) Apache Kafka . Si realmente lo necesitara con urgencia, podría aumentar ese tamaño y asegurarse de aumentar los búferes de red para sus productores y consumidores.

Y si realmente le importa dividir su mensaje, asegúrese de que cada división de mensaje tenga exactamente la misma clave para que se envíe a la misma partición, y el contenido de su mensaje debe informar una "identificación de parte" para que su consumidor pueda reconstruir completamente el mensaje .

También puede explorar la compresión, si su mensaje está basado en texto (compresión gzip, snappy, lz4), lo que puede reducir el tamaño de los datos, pero no mágicamente.

Nuevamente, debe usar un sistema externo para almacenar esos datos y simplemente enviar una referencia externa a Kafka. Esa es una arquitectura muy común, y una con la que debe ir y es ampliamente aceptada.

Tenga esto en cuenta que Kafka funciona mejor solo si los mensajes son enormes en cantidad pero no en tamaño.

Fuente: https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka

Bhanu Hoysala
fuente
4
Es posible que desee tener en cuenta que "su" recomendación es una copia casi palabra por palabra de la recomendación de Quora de Stéphane Maarek en quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka
Mike
Kafka trabaja con mensajes grandes, sin ningún problema. La página de introducción en la página de inicio de Kafka incluso hace referencia a ella como un sistema de almacenamiento.
calloc_org
3

Para las personas que usan landoop kafka: puede pasar los valores de configuración en las variables de entorno como:

docker run -d --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083  -p 9581-9585:9581-9585 -p 9092:9092
 -e KAFKA_TOPIC_MAX_MESSAGE_BYTES=15728640 -e KAFKA_REPLICA_FETCH_MAX_BYTES=15728640  landoop/fast-data-dev:latest `

Y si está usando rdkafka, pase el message.max.bytes en la configuración del productor como:

  const producer = new Kafka.Producer({
        'metadata.broker.list': 'localhost:9092',
        'message.max.bytes': '15728640',
        'dr_cb': true
    });

Del mismo modo, para el consumidor,

  const kafkaConf = {
   "group.id": "librd-test",
   "fetch.message.max.bytes":"15728640",
   ... .. }                                                                                                                                                                                                                                                      
informador
fuente