Tengo un escenario como el siguiente:
- Gatillo una
Task 1
yTask 2
sólo cuando los nuevos datos es avialable para ellos en la tabla de origen (Athena). El desencadenante para la Tarea1 y la Tarea2 debería ocurrir cuando se realiza una nueva partición de datos en un día. - Activar
Task 3
solo al completarTask 1
yTask 2
- Activa
Task 4
solo la finalización deTask 3
Mi código
from airflow import DAG
from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS
yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': yesterday,
'email': FAILURE_EMAILS,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
task_id='athena_wait_for_Task1_partition_exists',
database_name='DB',
table_name='Table1',
expression='load_date={{ ds_nodash }}',
timeout=60,
dag=dag)
Athena_Trigger_for_Task2 = AwsGlueCatalogPartitionSensor(
task_id='athena_wait_for_Task2_partition_exists',
database_name='DB',
table_name='Table2',
expression='load_date={{ ds_nodash }}',
timeout=60,
dag=dag)
execute_Task1 = PostgresOperator(
task_id='Task1',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task1.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task2 = PostgresOperator(
task_id='Task2',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task2.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task3 = PostgresOperator(
task_id='Task3',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task3.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task4 = PostgresOperator(
task_id='Task4',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task4",
params={'limit': '50'},
dag=dag
)
execute_Task1.set_upstream(Athena_Trigger_for_Task1)
execute_Task2.set_upstream(Athena_Trigger_for_Task2)
execute_Task3.set_upstream(execute_Task1)
execute_Task3.set_upstream(execute_Task2)
execute_Task4.set_upstream(execute_Task3)
¿Cuál es la mejor forma óptima de lograrlo?
Task1
yTask2
va en bucle. Para mí, los datos se cargan en la tabla fuente de Athena 10 AM CET.Respuestas:
Creo que su pregunta aborda dos problemas principales:
schedule_interval
manera explícita para que @daily esté configurando algo que no espera.la respuesta corta: establezca explícitamente su horario_intervalo con un formato de trabajo cron y use operadores de sensores para verificar de vez en cuando
dónde
startime
es a qué hora comenzará su tarea diaria,endtime
cuál es la última hora del día que debe verificar si se realizó un evento antes de marcarlo como fallido ypoke_time
es el intervalo en elsensor_operator
que verificará si sucedió el evento.Cómo abordar el trabajo cron explícitamente cada vez que configura su dag
@daily
como lo hizo:de los documentos , puedes ver que realmente estás haciendo:
@daily - Run once a day at midnight
Lo que ahora tiene sentido por qué obtiene un error de tiempo de espera y falla después de 5 minutos porque configuró
'retries': 1
y'retry_delay': timedelta(minutes=5)
. Entonces intenta ejecutar el dag a medianoche, falla. vuelve a intentarlo 5 minutos después y vuelve a fallar, por lo que se marca como fallido.Básicamente, @daily run está configurando un trabajo cron implícito de:
El formato del trabajo cron es del siguiente formato y establece el valor en
*
siempre que quiera decir "todos".Minute Hour Day_of_Month Month Day_of_Week
Entonces @daily básicamente dice ejecutar esto cada: minuto 0 hora 0 de todos los días_del_mes de todos los meses de todos los días_de_semana
Por lo tanto, su caso se ejecuta cada: minuto 0 hora 10 de todos los días_de_mes de todos los_mes de todos los días_de_semana. Esto se traduce en formato de trabajo cron a:
Cómo activar y volver a intentar correctamente la ejecución del dag cuando depende de un evento externo para completar la ejecución
usted podría desencadenar un DAG en el flujo de aire de un evento externo utilizando el comando
airflow trigger_dag
. esto sería posible si de alguna manera pudieras activar una función lambda / script de python para apuntar a tu instancia de flujo de aire.Si no puede activar el dag externamente, use un operador de sensor como lo hizo OP, establezca un poke_time y establezca un número alto razonable de reintentos.
fuente
airflow trigger_dag
. esto sería posible si de alguna manera pudieras activar una función lambda / script de python para apuntar a tu instancia de flujo de aire.AwsGlueCatalogPartitionSensor
junto con el comando de flujo de aire{{ds_nodash}}
para las salidas de partición. Mi pregunta entonces cómo programar esto.