Al definir una topología en flujos kafka, se puede agregar una tienda de estado global. Necesitará un tema fuente, así como a ProcessorSupplier
. El procesador recibe registros y podría transformarlos teóricamente antes de agregarlos a la tienda. Pero en caso de restauración, los registros se insertan directamente desde el tema fuente (registro de cambios) en el almacén de estado global, omitiendo la eventual transformación realizada en el procesador.
+-------------+ +-------------+ +---------------+
| | | | | global |
|source topic -------------> processor +--------------> state |
|(changelog) | | | | store |
+-------------+ +-------------+ +---------------+
| ^
| |
+---------------------------------------------------------+
record directly inserted during restoration
StreamsBuilder # addGlobalStore (StoreBuilder storeBuilder, Tema de cadena, Consumido consumido, ProcessorSupplier stateUpdateSupplier) Agrega un StateStore global a la topología.
Según la documentación
NOTA: no debe usar el procesador para insertar registros transformados en el almacén de estado global . Esta tienda utiliza el tema de origen como registro de cambios y durante la restauración insertará registros directamente desde el origen . Este ProcessorNode debe usarse para mantener actualizado StateStore.
Paralelamente, ya que el error principal está actualmente abierto en el rastreador de errores kafka: el procesador personalizado KAFKA-7663 suministrado en addGlobalStore no se utiliza al restaurar el estado del tema que explica exactamente lo que se indica en la documentación, pero parece ser un error aceptado.
Me pregunto si KAFKA-7663 es realmente un error o no. Según la documentación, parece haber sido diseñado de esta manera, en cuyo caso me cuesta entender el caso de uso.
¿Alguien puede explicar los principales casos de uso de esta API de bajo nivel? Lo único que se me ocurre es procesar los efectos secundarios, como por ejemplo, realizar algunas operaciones de registro en el procesador.
Pregunta adicional: si el tema de origen actúa como el registro de cambios de la tienda global, cuando un registro se elimina del tema porque la retención ha expirado, ¿se eliminará de la tienda de estado global? O la eliminación solo tendrá lugar en la tienda después de una restauración completa de la tienda desde el registro de cambios.
fuente
Respuestas:
Sí, esta es una pequeña trampa bastante extraña, pero la documentación es correcta. El Procesador para una tienda de estado global no debe hacer nada a los registros sino persistirlos en la tienda.
AFAIK, este no es un tema filosófico, solo práctico. La razón es simplemente el comportamiento que observa ... Streams trata el tema de entrada como un tema de registro de cambios para la tienda y, por lo tanto, omite el procesador (así como la deserialización) durante la restauración.
La razón por la que la restauración de estado omite cualquier procesamiento es que generalmente los datos en un registro de cambios son idénticos a los datos en la tienda, por lo que en realidad sería un error hacer algo nuevo. Además, es más eficiente simplemente quitar los bytes del cable y escribirlos en masa en las tiendas estatales. Digo "generalmente" porque en este caso, el tema de entrada no es exactamente como un tema de registro de cambios normal, en el sentido de que no recibe sus escrituras durante las visitas a la tienda.
Por lo que vale, también me cuesta entender el caso de uso. Aparentemente, deberíamos:
Por cierto, si desea el último comportamiento, puede aproximarlo ahora mismo aplicando sus transformaciones y luego usándolo
to(my-global-changelog)
para fabricar un tema de "registro de cambios". Luego, crearía la tienda global para leer su enmy-global-changelog
lugar de la entrada.Entonces, para darle una respuesta directa, KAFKA-7663 no es un error. Comentaré el ticket que propone convertirlo en una solicitud de función.
Respuesta adicional: los temas que actúan como registros de cambios para las tiendas estatales no deben configurarse con retención. En términos prácticos, esto significa que debe evitar el crecimiento infinito al habilitar la compactación y deshabilitar la retención de registros.
En la práctica, los datos antiguos que se caen de la retención y se eliminan no son un "evento", y los consumidores no tienen forma de saber si sucede o cuándo ocurre. Por lo tanto, no es posible eliminar datos de los almacenes estatales en respuesta a este no evento. Ocurriría como lo describe ... los registros se quedarían allí indefinidamente en la tienda global. Si / cuando se reemplaza una instancia, la nueva se restauraría a partir de la entrada y (obviamente) solo recibiría registros que existan en el tema en ese momento. Por lo tanto, el conjunto de Streams en su conjunto terminaría con una visión inconsistente del estado global. Es por eso que debes deshabilitar la retención.
La forma correcta de "descartar" datos antiguos de la tienda sería simplemente escribir una lápida para la clave deseada en el tema de entrada. Esto se propagaría correctamente a todos los miembros del clúster, se aplicaría correctamente durante la restauración y los intermediarios lo compactarían correctamente.
Espero que todo esto ayude. Definitivamente, ¡intervenga en el ticket y ayúdenos a dar forma a la API para que sea más intuitiva!
fuente