Kafka transmite casos de uso para agregar una tienda global

8

Al definir una topología en flujos kafka, se puede agregar una tienda de estado global. Necesitará un tema fuente, así como a ProcessorSupplier. El procesador recibe registros y podría transformarlos teóricamente antes de agregarlos a la tienda. Pero en caso de restauración, los registros se insertan directamente desde el tema fuente (registro de cambios) en el almacén de estado global, omitiendo la eventual transformación realizada en el procesador.

   +-------------+             +-------------+              +---------------+
   |             |             |             |              |    global     |
   |source topic  ------------->  processor  +-------------->    state      |
   |(changelog)  |             |             |              |    store      |
   +-------------+             +-------------+              +---------------+
          |                                                         ^
          |                                                         |
          +---------------------------------------------------------+
              record directly inserted during restoration

StreamsBuilder # addGlobalStore (StoreBuilder storeBuilder, Tema de cadena, Consumido consumido, ProcessorSupplier stateUpdateSupplier) Agrega un StateStore global a la topología.

Según la documentación

NOTA: no debe usar el procesador para insertar registros transformados en el almacén de estado global . Esta tienda utiliza el tema de origen como registro de cambios y durante la restauración insertará registros directamente desde el origen . Este ProcessorNode debe usarse para mantener actualizado StateStore.

Paralelamente, ya que el error principal está actualmente abierto en el rastreador de errores kafka: el procesador personalizado KAFKA-7663 suministrado en addGlobalStore no se utiliza al restaurar el estado del tema que explica exactamente lo que se indica en la documentación, pero parece ser un error aceptado.

Me pregunto si KAFKA-7663 es realmente un error o no. Según la documentación, parece haber sido diseñado de esta manera, en cuyo caso me cuesta entender el caso de uso.
¿Alguien puede explicar los principales casos de uso de esta API de bajo nivel? Lo único que se me ocurre es procesar los efectos secundarios, como por ejemplo, realizar algunas operaciones de registro en el procesador.

Pregunta adicional: si el tema de origen actúa como el registro de cambios de la tienda global, cuando un registro se elimina del tema porque la retención ha expirado, ¿se eliminará de la tienda de estado global? O la eliminación solo tendrá lugar en la tienda después de una restauración completa de la tienda desde el registro de cambios.

enfocar
fuente
2
Tenga en cuenta que la documentación anterior no señalaba el problema, y ​​acabamos de actualizar el documento como "solución intermedia".
Matthias J. Sax

Respuestas:

8

Sí, esta es una pequeña trampa bastante extraña, pero la documentación es correcta. El Procesador para una tienda de estado global no debe hacer nada a los registros sino persistirlos en la tienda.

AFAIK, este no es un tema filosófico, solo práctico. La razón es simplemente el comportamiento que observa ... Streams trata el tema de entrada como un tema de registro de cambios para la tienda y, por lo tanto, omite el procesador (así como la deserialización) durante la restauración.

La razón por la que la restauración de estado omite cualquier procesamiento es que generalmente los datos en un registro de cambios son idénticos a los datos en la tienda, por lo que en realidad sería un error hacer algo nuevo. Además, es más eficiente simplemente quitar los bytes del cable y escribirlos en masa en las tiendas estatales. Digo "generalmente" porque en este caso, el tema de entrada no es exactamente como un tema de registro de cambios normal, en el sentido de que no recibe sus escrituras durante las visitas a la tienda.

Por lo que vale, también me cuesta entender el caso de uso. Aparentemente, deberíamos:

  1. Deshágase de ese procesador por completo y siempre descargue los datos binarios del cable a las tiendas, tal como lo hace la restauración.
  2. Rediseñe las tiendas globales para permitir transformaciones arbitrarias antes de la tienda global. Podríamos:
    • continuar usando el tema de entrada y deserializar e invocar los procesadores durante la restauración también, O
    • agregue un registro de cambios real para las tiendas globales, de modo que encuestamos el tema de entrada, apliquemos algunas transformaciones y luego escribamos en la tienda global y en el registro de cambios de la tienda global. Luego, podemos usar el registro de cambios (no la entrada) para la restauración y la replicación.

Por cierto, si desea el último comportamiento, puede aproximarlo ahora mismo aplicando sus transformaciones y luego usándolo to(my-global-changelog)para fabricar un tema de "registro de cambios". Luego, crearía la tienda global para leer su en my-global-changeloglugar de la entrada.

Entonces, para darle una respuesta directa, KAFKA-7663 no es un error. Comentaré el ticket que propone convertirlo en una solicitud de función.

Respuesta adicional: los temas que actúan como registros de cambios para las tiendas estatales no deben configurarse con retención. En términos prácticos, esto significa que debe evitar el crecimiento infinito al habilitar la compactación y deshabilitar la retención de registros.

En la práctica, los datos antiguos que se caen de la retención y se eliminan no son un "evento", y los consumidores no tienen forma de saber si sucede o cuándo ocurre. Por lo tanto, no es posible eliminar datos de los almacenes estatales en respuesta a este no evento. Ocurriría como lo describe ... los registros se quedarían allí indefinidamente en la tienda global. Si / cuando se reemplaza una instancia, la nueva se restauraría a partir de la entrada y (obviamente) solo recibiría registros que existan en el tema en ese momento. Por lo tanto, el conjunto de Streams en su conjunto terminaría con una visión inconsistente del estado global. Es por eso que debes deshabilitar la retención.

La forma correcta de "descartar" datos antiguos de la tienda sería simplemente escribir una lápida para la clave deseada en el tema de entrada. Esto se propagaría correctamente a todos los miembros del clúster, se aplicaría correctamente durante la restauración y los intermediarios lo compactarían correctamente.

Espero que todo esto ayude. Definitivamente, ¡intervenga en el ticket y ayúdenos a dar forma a la API para que sea más intuitiva!

Juan
fuente
Sí, definitivamente ayuda mucho. Gracias por esta respuesta detallada :)
zoom
2
Para aclarar "Los temas que actúan como registros de cambios para los almacenes de estado no deben configurarse con retención": Esto significa que no debe configurar el tema para que caduque los datos después de que haya pasado un cierto tiempo, o después de que se haya excedido un cierto umbral de tamaño. En cambio, los datos deben conservarse 'para siempre' en el tema, y ​​permitir la compactación ayuda a garantizar que el tema aún no pueda crecer fuera de los límites.
Michael G. Noll
Estaba buscando la explicación. Muchas gracias
SunilS