Estoy tratando de mover archivos s3 de un depósito "sin eliminación" (lo que significa que no puedo eliminar los archivos) a GCS usando el flujo de aire. No puedo garantizar que haya nuevos archivos todos los días, pero debo buscar nuevos archivos todos los días.
Mi problema es la creación dinámica de subdags. Si hay archivos, necesito subdags. Si NO hay archivos, no necesito subdags. Mi problema es la configuración aguas arriba / aguas abajo. En mi código, detecta archivos, pero no inicia los subdags como se supone que deben hacerlo. Me falta algo
Aquí está mi código:
from airflow import models
from airflow.utils.helpers import chain
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.contrib.operators.s3_to_gcs_operator import S3ToGoogleCloudStorageOperator
from airflow.utils import dates
from airflow.models import Variable
import logging
args = {
'owner': 'Airflow',
'start_date': dates.days_ago(1),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_success': True,
}
bucket = 'mybucket'
prefix = 'myprefix/'
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = []
parent_dag = models.DAG(
dag_id='My_Ingestion',
default_args=args,
schedule_interval='@daily',
catchup=False
)
def Check_For_Files(**kwargs):
s3 = S3Hook(aws_conn_id='S3_BOX')
s3.get_conn()
bucket = bucket
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = s3.list_keys(bucket_name=bucket, prefix='myprefix/file')
for file in files:
print(file)
print(file.split("_")[-2])
print(file.split("_")[-2][-8:]) ##proves I can see a date in the file name is ok.
maxdate = maxdate if maxdate > int(file.split("_")[-2][-8:]) else int(file.split("_")[-2][-8:])
if maxdate > LastBDEXDate:
return 'Start_Process'
return 'finished'
def create_subdag(dag_parent, dag_id_child_prefix, file_name):
# dag params
dag_id_child = '%s.%s' % (dag_parent.dag_id, dag_id_child_prefix)
# dag
subdag = models.DAG(dag_id=dag_id_child,
default_args=args,
schedule_interval=None)
# operators
s3_to_gcs_op = S3ToGoogleCloudStorageOperator(
task_id=dag_id_child,
bucket=bucket,
prefix=file_name,
dest_gcs_conn_id='GCP_Account',
dest_gcs='gs://my_files/To_Process/',
replace=False,
gzip=True,
dag=subdag)
return subdag
def create_subdag_operator(dag_parent, filename, index):
tid_subdag = 'file_{}'.format(index)
subdag = create_subdag(dag_parent, tid_subdag, filename)
sd_op = SubDagOperator(task_id=tid_subdag, dag=dag_parent, subdag=subdag)
return sd_op
def create_subdag_operators(dag_parent, file_list):
subdags = [create_subdag_operator(dag_parent, file, file_list.index(file)) for file in file_list]
# chain subdag-operators together
chain(*subdags)
return subdags
check_for_files = BranchPythonOperator(
task_id='Check_for_s3_Files',
provide_context=True,
python_callable=Check_For_Files,
dag=parent_dag
)
finished = DummyOperator(
task_id='finished',
dag=parent_dag
)
decision_to_continue = DummyOperator(
task_id='Start_Process',
dag=parent_dag
)
if len(files) > 0:
subdag_ops = create_subdag_operators(parent_dag, files)
check_for_files >> decision_to_continue >> subdag_ops[0] >> subdag_ops[-1] >> finished
check_for_files >> finished
python
airflow
directed-acyclic-graphs
arcee123
fuente
fuente
spark
trabajos o algúnpython
script y qué está utilizando para ejecutarlolivy
o algún otro métodofiles
una lista vacía?Respuestas:
A continuación se muestra la forma recomendada de crear un DAG dinámico o sub-DAG en el flujo de aire, aunque también hay otras formas, pero supongo que esto se aplicaría en gran medida a su problema.
Primero, cree un archivo
(yaml/csv)
que incluya la lista de todos loss3
archivos y ubicaciones, en su caso ha escrito una función para almacenarlos en la lista, diría que los almacene en unyaml
archivo separado y cárguelo en tiempo de ejecución en el flujo de aire env y luego cree DAGA continuación se muestra un
yaml
archivo de muestra :dynamicDagConfigFile.yaml
Puede modificar su
Check_For_Files
función para almacenarlos en unyaml
archivo.Ahora podemos pasar a la creación dinámica de dag:
Primero defina dos tareas utilizando operadores ficticios, es decir, la tarea inicial y la final. Tales tareas son aquellas en las que vamos a construir sobre las nuestras
DAG
creando dinámicamente tareas entre ellas:DAG dinámico: lo usaremos
PythonOperators
en el flujo de aire. La función debe recibir como argumentos el id de la tarea; una función de python a ejecutar, es decir, python_callable para el operador de Python; y un conjunto de argumentos para ser utilizados durante la ejecución.Incluya un argumento el
task id
. Por lo tanto, podemos intercambiar datos entre tareas generadas de forma dinámica, por ejemplo, víaXCOM
.Puede especificar su función de operación dentro de este dag dinámico como
s3_to_gcs_op
.Finalmente, según la ubicación presente en el archivo yaml, puede crear dags dinámicos, primero lea el
yaml
archivo como se muestra a continuación y cree dag dinámico:Definición final de DAG:
La idea es que
Código de flujo de aire completo en orden:
fuente
upload_s3_toGCS
no existirá y se producirá un error en el flujo de aire.yaml
archivo una vez que todos estos archivos se cargan en GCS, de esta manera solo los archivos nuevos estarán presentes en elyaml
archivo. Y en caso de que no haya archivos nuevos, elyaml
archivo estará vacío y no se creará dag dinámico. Es por eso que elyaml
archivo es una opción mucho mejor en comparación con el almacenamiento de archivos en una lista.yaml
archivo también ayudará a mantener el registro de los archivos s3 de una manera, si se supone que parte del archivo s3 no se carga en GCS, también puede mantener un indicador correspondiente a ese archivo y luego volver a intentarlo en la próxima ejecución del DAG.if
condición antes del DAG que verificaráyaml
si hay archivos nuevos en los archivos, si hay archivos nuevos, ejecútelos; de lo contrario, omítalo.