¿Cómo desencadenar una tarea Airflow solo cuando la nueva partición / datos está disponible en la tabla de athena de AWS usando DAG en python?

9

Tengo un escenario como el siguiente:

  1. Gatillo una Task 1y Task 2sólo cuando los nuevos datos es avialable para ellos en la tabla de origen (Athena). El desencadenante para la Tarea1 y la Tarea2 debería ocurrir cuando se realiza una nueva partición de datos en un día.
  2. Activar Task 3solo al completar Task 1yTask 2
  3. Activa Task 4solo la finalización deTask 3

ingrese la descripción de la imagen aquí

Mi código

from airflow import DAG

from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta

from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS

yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task1_partition_exists',
    database_name='DB',
    table_name='Table1',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

Athena_Trigger_for_Task2 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task2_partition_exists',
    database_name='DB',
    table_name='Table2',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

execute_Task1 = PostgresOperator(
    task_id='Task1',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task1.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)

execute_Task2 = PostgresOperator(
    task_id='Task2',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task2.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)



execute_Task3 = PostgresOperator(
    task_id='Task3',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task3.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)

execute_Task4 = PostgresOperator(
    task_id='Task4',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task4",
    params={'limit': '50'},
    dag=dag
)



execute_Task1.set_upstream(Athena_Trigger_for_Task1)
execute_Task2.set_upstream(Athena_Trigger_for_Task2)

execute_Task3.set_upstream(execute_Task1)
execute_Task3.set_upstream(execute_Task2)

execute_Task4.set_upstream(execute_Task3)

¿Cuál es la mejor forma óptima de lograrlo?

pankaj
fuente
¿Tienes algún problema con esta solución?
Bernardo Stearns resucitó el
@ Bernardostearnsreisen, a veces el Task1y Task2va en bucle. Para mí, los datos se cargan en la tabla fuente de Athena 10 AM CET.
pankaj
¿quiere decir que el flujo de aire vuelve a intentar la Tarea1 y la Tarea2 muchas veces hasta que tenga éxito?
Bernardo Stearns resucitó el
@Bernardostearnsreisen, sí exactamente
pankaj
1
@Bernardostearnsreisen, no sabía cómo otorgar la recompensa :)
pankaj

Respuestas:

1

Creo que su pregunta aborda dos problemas principales:

  1. olvidando configurarlo de schedule_intervalmanera explícita para que @daily esté configurando algo que no espera.
  2. Cómo activar y volver a intentar correctamente la ejecución del dag cuando depende de un evento externo para completar la ejecución

la respuesta corta: establezca explícitamente su horario_intervalo con un formato de trabajo cron y use operadores de sensores para verificar de vez en cuando

default_args={
        'retries': (endtime - starttime)*60/poke_time
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='0 10 * * *')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
     ....
    poke_time= 60*5 #<---- set a poke_time in seconds
    dag=dag)

dónde startimees a qué hora comenzará su tarea diaria, endtimecuál es la última hora del día que debe verificar si se realizó un evento antes de marcarlo como fallido ypoke_time es el intervalo en el sensor_operatorque verificará si sucedió el evento.

Cómo abordar el trabajo cron explícitamente cada vez que configura su dag@dailycomo lo hizo:

dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

de los documentos , puedes ver que realmente estás haciendo: @daily - Run once a day at midnight

Lo que ahora tiene sentido por qué obtiene un error de tiempo de espera y falla después de 5 minutos porque configuró 'retries': 1y 'retry_delay': timedelta(minutes=5). Entonces intenta ejecutar el dag a medianoche, falla. vuelve a intentarlo 5 minutos después y vuelve a fallar, por lo que se marca como fallido.

Básicamente, @daily run está configurando un trabajo cron implícito de:

@daily -> Run once a day at midnight -> 0 0 * * *

El formato del trabajo cron es del siguiente formato y establece el valor en * siempre que quiera decir "todos".

Minute Hour Day_of_Month Month Day_of_Week

Entonces @daily básicamente dice ejecutar esto cada: minuto 0 hora 0 de todos los días_del_mes de todos los meses de todos los días_de_semana

Por lo tanto, su caso se ejecuta cada: minuto 0 hora 10 de todos los días_de_mes de todos los_mes de todos los días_de_semana. Esto se traduce en formato de trabajo cron a:

0 10 * * *

Cómo activar y volver a intentar correctamente la ejecución del dag cuando depende de un evento externo para completar la ejecución

  1. usted podría desencadenar un DAG en el flujo de aire de un evento externo utilizando el comando airflow trigger_dag. esto sería posible si de alguna manera pudieras activar una función lambda / script de python para apuntar a tu instancia de flujo de aire.

  2. Si no puede activar el dag externamente, use un operador de sensor como lo hizo OP, establezca un poke_time y establezca un número alto razonable de reintentos.

Bernardo stearns reisen
fuente
Gracias por esto. Además, si deseo activar las tareas en función del evento en lugar del tiempo, es decir, tan pronto como se pueda activar la nueva partición de datos en la fuente, se debe activar la siguiente tarea de `AWS Athena Tables`. Entonces, ¿cómo programo? ¿Es mi código actual lo suficientemente apto?
pankaj
@pankaj, solo veo dos alternativas. No sé mucho sobre aws athena, pero podría activar un dag en el flujo de aire de un evento externo mediante el comando airflow trigger_dag. esto sería posible si de alguna manera pudieras activar una función lambda / script de python para apuntar a tu instancia de flujo de aire.
Bernardo Stearns resucitó el
la otra alternativa es más o menos lo que está haciendo, porque no tiene un desencadenante basado en eventos, debe verificar periódicamente si este evento sucedió. Por lo tanto, el uso de esta solución actual establecería un trabajo cron durante un rango de horas ejecutando el dag en una alta frecuencia de minutos ... muchos fallarán pero podrán capturar bastante rápido después de que ocurra el evento
Bernardo stearns resisen
@ Bernado, he descubierto el paquete en Airflow llamado AwsGlueCatalogPartitionSensorjunto con el comando de flujo de aire {{ds_nodash}}para las salidas de partición. Mi pregunta entonces cómo programar esto.
pankaj
@Benado, ¿Puedes mirar mi código donde he implementado la verificación mencionada anteriormente y dar tus aportes?
pankaj