Calcule la velocidad promedio de las carreteras [cerrado]

20

Fui a una entrevista de trabajo de ingeniero de datos. El entrevistador me hizo una pregunta. Me dio una situación y me pidió que diseñara el flujo de datos para ese sistema. Lo resolví pero a él no le gustó mi solución y fallé. Me gustaría saber si tienes mejores ideas sobre cómo resolver ese desafío.

La pregunta fue:

Nuestro sistema recibe cuatro flujos de datos. Los datos contienen una identificación del vehículo, velocidad y coordinaciones de geolocalización. Cada vihicle envía sus datos una vez por minuto. No hay conexión entre una secuencia específica a una carretera o vihicle específico o cualquier otra cosa. Hay una función que acepta coordinaciones y devuelve un nombre de sección de carretera. Necesitamos conocer la velocidad promedio por tramo de carretera por 5 minutos. Finalmente queremos escribir los resultados a Kafka.

ingrese la descripción de la imagen aquí

Entonces mi solución fue:

Primero, escriba todos los datos en un grupo de Kafka, en un tema, dividido por los 5-6 primeros dígitos de la latitud concatenados con los 5-6 primeros dígitos de la longitud. Luego, lea los datos por Structured Streaming, agregue para cada fila el nombre de la sección de la carretera por las coordinaciones (hay un udf predefinido para eso), y luego combine los datos por el nombre de la sección de la carretera.

Como particiono los datos en Kafka por los primeros 5-6 dígitos de las coordinaciones, después de traducir las coordinaciones al nombre de la sección, no es necesario transferir muchos datos a la partición correcta y, por lo tanto, puedo aprovechar la operación colesce () eso no desencadena una barajadura completa.

Luego calculando la velocidad promedio por ejecutor.

Todo el proceso ocurrirá cada 5 minutos y escribiremos los datos en modo Añadir al sumidero Kafka final.

ingrese la descripción de la imagen aquí

De nuevo, al entrevistador no le gustó mi solución. ¿Alguien podría sugerir cómo mejorarlo o una idea completamente diferente y mejor?

solo
fuente
¿No sería mejor preguntarle a la persona qué no le gustó exactamente?
Gino Pane
Creo que es una mala idea dividir por el concatenado lat-long. ¿No se informará el punto de datos para cada carril como una coordenada ligeramente diferente?
webber
@webber, por lo tanto, solo tomo unos pocos dígitos, por lo que la posición no será única sino relativamente del tamaño de una sección de carretera.
Alon

Respuestas:

6

Encontré esta pregunta muy interesante y pensé en intentarlo.

Como evalué más adelante, su intento en sí es bueno, excepto lo siguiente:

dividido por los 5-6 primeros dígitos de la latitud concatenados con los 5-6 primeros dígitos de la longitud

Si ya tiene un método para obtener la identificación / nombre de la sección de la carretera en función de la latitud y la longitud, ¿por qué no llamar primero a ese método y utilizar la identificación / nombre de la sección de la carretera para dividir los datos en primer lugar?

Y después de eso, todo es bastante fácil, por lo que la topología será

Merge all four streams ->
Select key as the road section id/name ->
Group the stream by Key -> 
Use time windowed aggregation for the given time ->
Materialize it to a store. 

(Se puede encontrar una explicación más detallada en los comentarios en el código a continuación. Pregunte si algo no está claro)

He agregado el código al final de esta respuesta, tenga en cuenta que en lugar de la media, he usado la suma, ya que es más fácil de demostrar. Es posible hacer un promedio almacenando algunos datos adicionales.

He detallado la respuesta en los comentarios. A continuación se muestra un diagrama de topología generado a partir del código (gracias a https://zz85.github.io/kafka-streams-viz/ )

Topología:

Diagrama de topología

    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.Materialized;
    import org.apache.kafka.streams.kstream.TimeWindows;
    import org.apache.kafka.streams.state.Stores;
    import org.apache.kafka.streams.state.WindowBytesStoreSupplier;

    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    import java.util.concurrent.CountDownLatch;

    public class VehicleStream {
        // 5 minutes aggregation window
        private static final long AGGREGATION_WINDOW = 5 * 50 * 1000L;

        public static void main(String[] args) throws Exception {
            Properties properties = new Properties();

            // Setting configs, change accordingly
            properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "vehicle.stream.app");
            properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,kafka2:19092");
            properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

            // initializing  a streambuilder for building topology.
            final StreamsBuilder builder = new StreamsBuilder();

            // Our initial 4 streams.
            List<String> streamInputTopics = Arrays.asList(
                    "vehicle.stream1", "vehicle.stream2",
                    "vehicle.stream3", "vehicle.stream4"
            );
            /*
             * Since there is no connection between a specific stream
             * to a specific road or vehicle or anything else,
             * we can take all four streams as a single stream
             */
            KStream<String, String> source = builder.stream(streamInputTopics);

            /*
             * The initial key is unimportant (which can be ignored),
             * Instead, we will be using the section name/id as key.
             * Data will contain comma separated values in following format.
             * VehicleId,Speed,Latitude,Longitude
             */
            WindowBytesStoreSupplier windowSpeedStore = Stores.persistentWindowStore(
                    "windowSpeedStore",
                    AGGREGATION_WINDOW,
                    2, 10, true
            );
            source
                    .peek((k, v) -> printValues("Initial", k, v))
                    // First, we rekey the stream based on the road section.
                    .selectKey(VehicleStream::selectKeyAsRoadSection)
                    .peek((k, v) -> printValues("After rekey", k, v))
                    .groupByKey()
                    .windowedBy(TimeWindows.of(AGGREGATION_WINDOW))
                    .aggregate(
                            () -> "0.0", // Initialize
                            /*
                             * I'm using summing here for the aggregation as that's easier.
                             * It can be converted to average by storing extra details on number of records, etc..
                             */
                            (k, v, previousSpeed) ->  // Aggregator (summing speed)
                                    String.valueOf(
                                            Double.parseDouble(previousSpeed) +
                                                    VehicleSpeed.getVehicleSpeed(v).speed
                                    ),
                            Materialized.as(windowSpeedStore)
                    );
            // generating the topology
            final Topology topology = builder.build();
            System.out.print(topology.describe());

            // constructing a streams client with the properties and topology
            final KafkaStreams streams = new KafkaStreams(topology, properties);
            final CountDownLatch latch = new CountDownLatch(1);

            // attaching shutdown handler
            Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
                @Override
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });
            try {
                streams.start();
                latch.await();
            } catch (Throwable e) {
                System.exit(1);
            }
            System.exit(0);
        }


        private static void printValues(String message, String key, Object value) {
            System.out.printf("===%s=== key: %s value: %s%n", message, key, value.toString());
        }

        private static String selectKeyAsRoadSection(String key, String speedValue) {
            // Would make more sense when it's the section id, rather than a name.
            return coordinateToRoadSection(
                    VehicleSpeed.getVehicleSpeed(speedValue).latitude,
                    VehicleSpeed.getVehicleSpeed(speedValue).longitude
            );
        }

        private static String coordinateToRoadSection(String latitude, String longitude) {
            // Dummy function
            return "Area 51";
        }

        public static class VehicleSpeed {
            public String vehicleId;
            public double speed;
            public String latitude;
            public String longitude;

            public static VehicleSpeed getVehicleSpeed(String data) {
                return new VehicleSpeed(data);
            }

            public VehicleSpeed(String data) {
                String[] dataArray = data.split(",");
                this.vehicleId = dataArray[0];
                this.speed = Double.parseDouble(dataArray[1]);
                this.latitude = dataArray[2];
                this.longitude = dataArray[3];
            }

            @Override
            public String toString() {
                return String.format("veh: %s, speed: %f, latlong : %s,%s", vehicleId, speed, latitude, longitude);
            }
        }
    }
Irshad PI
fuente
¿No es una mala idea fusionar todas las transmisiones? Esto puede convertirse en un cuello de botella para su flujo de datos. ¿Qué sucede cuando comienza a recibir más y más flujos de entrada a medida que su sistema crece? ¿Será esto escalable?
wypul
@wypul> ¿no es una mala idea fusionar todas las transmisiones? -> Creo que no. El paralelismo en Kafka no se logra a través de flujos, sino a través de particiones (y tareas), subprocesos, etc. Los flujos son una forma de agrupar los datos. > ¿Será escalable? -> si. Dado que estamos modificando por secciones de carretera y asumiendo que las secciones de carretera están distribuidas de manera equitativa, podemos aumentar el número de particiones para estos temas para procesar paralelamente el flujo en diferentes contenedores. Podemos usar un buen algoritmo de partición basado en la sección de la carretera para distribuir la carga entre las réplicas.
Irshad PI
1

El problema como tal parece simple y las soluciones ofrecidas ya tienen mucho sentido. Me pregunto si al entrevistador le preocupa el diseño y el rendimiento de la solución en la que se ha centrado o la precisión del resultado. Como otros se han centrado en el código, el diseño y el rendimiento, consideraré la precisión.

Solución de transmisión

A medida que los datos fluyen, podemos proporcionar una estimación aproximada de la velocidad promedio de una carretera. Esta estimación será útil para detectar la congestión, pero será errónea para determinar el límite de velocidad.

  1. Combina los 4 flujos de datos juntos.
  2. Cree una ventana de 5 minutos para capturar datos de las 4 transmisiones en 5 minutos.
  3. Aplique UDF en las coordenadas para obtener el nombre de la calle y el nombre de la ciudad. Los nombres de las calles a menudo se duplican en las ciudades, por lo que utilizaremos city-name + street-name como clave.
  4. Calcule la velocidad promedio con una sintaxis como -

    vehicle_street_speed
      .groupBy($"city_name_street_name")
      .agg(
        avg($"speed").as("avg_speed")
      )

5. write the result to the Kafka Topic

Solución por lotes

Esta estimación estará desactivada porque el tamaño de la muestra es pequeño. Necesitaremos un procesamiento por lotes de datos completos de mes / trimestre / año para determinar con mayor precisión el límite de velocidad.

  1. Lea los datos de un año del lago de datos (o tema de Kafka)

  2. Aplique UDF en las coordenadas para obtener el nombre de la calle y el nombre de la ciudad.

  3. Calcule la velocidad promedio con una sintaxis como -


    vehicle_street_speed
      .groupBy($"city_name_street_name")
      .agg(
        avg($"speed").as("avg_speed")
      )

  1. escriba el resultado en el lago de datos.

En base a este límite de velocidad más preciso, podemos predecir el tráfico lento en la aplicación de transmisión.

Salim
fuente
1

Veo algunos problemas con su estrategia de partición:

  • Cuando dice que va a particionar sus datos en función de los primeros 5-6 dígitos de lat de longitud, no podrá determinar de antemano el número de particiones kafka. Tendrá datos asimétricos, ya que en algunos tramos de carretera observará un volumen alto que otros.

  • Y su combinación de teclas no garantiza los mismos datos de la sección de carretera en la misma partición de todos modos y, por lo tanto, no puede estar seguro de que no se barajarán.

La información dada por la OMI no es suficiente para diseñar toda la tubería de datos. Porque al diseñar la tubería, la forma de particionar sus datos juega un papel importante. Debe preguntar más acerca de los datos que está recibiendo, como la cantidad de vehículos, el tamaño de los flujos de datos de entrada, ¿es fijo el número de flujos o puede aumentar en el futuro? ¿Los flujos de datos de entrada que está recibiendo son flujos kafka? ¿Cuántos datos recibes en 5 minutos?

  • Ahora supongamos que tiene 4 secuencias escritas en 4 temas en kafka o 4 particiones y no tiene ninguna clave específica, pero sus datos se particionan en función de alguna clave del centro de datos o se dividen en hash. De lo contrario, esto debería hacerse en el lado de los datos en lugar de deduplicar los datos en otra secuencia de kafka y particionar.
  • Si está recibiendo los datos en diferentes centros de datos, entonces necesita llevar los datos a un clúster y para ese propósito puede usar el fabricante de espejos Kafka o algo similar.
  • Después de tener todos los datos en un clúster, puede ejecutar un trabajo de transmisión estructurado allí y con un intervalo de activación de 5 minutos y una marca de agua según sus requisitos.
  • Para calcular el promedio y evitar muchos movimientos aleatorios, puede usar una combinación de mapValuesy en reduceByKeylugar de groupBy. Consulte este .
  • Puede escribir los datos en el sumidero kafka después del procesamiento.
wypul
fuente
mapValues ​​y reduceByKey pertenecen al RDD de bajo nivel. ¿Catalyst no es lo suficientemente inteligente como para generar el RDD más eficiente cuando agrupo y calculo el promedio?
Alon
@Alon Catalyst seguramente podrá encontrar el mejor plan para ejecutar su consulta, pero si usa groupBy, los datos con la misma clave se barajarán primero en la misma partición y luego aplicarán la operación agregada. mapValuesy de reduceByhecho pertenece a RDD de bajo nivel, pero aún funcionará mejor en esta situación, ya que primero calculará el agregado por partición y luego barajará.
wypul
0

Los principales problemas que veo con esta solución son:

  • Las secciones de carretera que se encuentran en el borde de los cuadrados de 6 dígitos del mapa tendrán datos en múltiples particiones de tema y tendrán múltiples velocidades promedio.
  • El tamaño de los datos de ingestión para sus particiones Kafka puede estar desequilibrado (ciudad versus desierto). Particionar por los primeros dígitos de la identificación del automóvil podría ser una buena idea de la OMI.
  • No estoy seguro de haber seguido la parte de fusión, pero suena problemático.

Yo diría que la solución debe hacerlo: leer desde el flujo de Kafka -> UDF -> sección de carretera groupby -> promedio -> escribir en el flujo de Kafka.

David Taub
fuente
0

Mi diseño dependería de

  1. Numero de caminos
  2. Numero de vehiculos
  3. Costo de cómputo del camino desde coordenadas

Si quiero escalar cualquier cantidad de conteos, el diseño se vería así ingrese la descripción de la imagen aquí

Preocupaciones cruzadas sobre este diseño:

  1. Mantener un estado duradero de las secuencias de entrada (si la entrada es kafka, podemos almacenar compensaciones con Kafka o externamente)
  2. Periódicamente, los estados de punto de control al sistema externo (prefiero usar barreras de punto de control asíncronas en Flink )

Algunas mejoras prácticas posibles en este diseño:

  1. Función de mapeo de secciones de carreteras en caché si es posible, en función de las carreteras
  2. Manejo de pings perdidos (en la práctica, no todos los ping están disponibles)
  3. Teniendo en cuenta la curvatura de la carretera (rumbo y altitud en cuenta)
yugandhar
fuente