Estoy tratando de mover archivos s3 de un depósito "sin eliminación" (lo que significa que no puedo eliminar los archivos) a GCS usando el flujo de aire. No puedo garantizar que haya nuevos archivos todos los días, pero debo buscar nuevos archivos todos los días.
Mi problema es la creación dinámica de subdags. Si hay archivos, necesito subdags. Si NO hay archivos, no necesito subdags. Mi problema es la configuración aguas arriba / aguas abajo. En mi código, detecta archivos, pero no inicia los subdags como se supone que deben hacerlo. Me falta algo
Aquí está mi código:
from airflow import models
from airflow.utils.helpers import chain
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.contrib.operators.s3_to_gcs_operator import S3ToGoogleCloudStorageOperator
from airflow.utils import dates
from airflow.models import Variable
import logging
args = {
'owner': 'Airflow',
'start_date': dates.days_ago(1),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_success': True,
}
bucket = 'mybucket'
prefix = 'myprefix/'
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = []
parent_dag = models.DAG(
dag_id='My_Ingestion',
default_args=args,
schedule_interval='@daily',
catchup=False
)
def Check_For_Files(**kwargs):
s3 = S3Hook(aws_conn_id='S3_BOX')
s3.get_conn()
bucket = bucket
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = s3.list_keys(bucket_name=bucket, prefix='myprefix/file')
for file in files:
print(file)
print(file.split("_")[-2])
print(file.split("_")[-2][-8:]) ##proves I can see a date in the file name is ok.
maxdate = maxdate if maxdate > int(file.split("_")[-2][-8:]) else int(file.split("_")[-2][-8:])
if maxdate > LastBDEXDate:
return 'Start_Process'
return 'finished'
def create_subdag(dag_parent, dag_id_child_prefix, file_name):
# dag params
dag_id_child = '%s.%s' % (dag_parent.dag_id, dag_id_child_prefix)
# dag
subdag = models.DAG(dag_id=dag_id_child,
default_args=args,
schedule_interval=None)
# operators
s3_to_gcs_op = S3ToGoogleCloudStorageOperator(
task_id=dag_id_child,
bucket=bucket,
prefix=file_name,
dest_gcs_conn_id='GCP_Account',
dest_gcs='gs://my_files/To_Process/',
replace=False,
gzip=True,
dag=subdag)
return subdag
def create_subdag_operator(dag_parent, filename, index):
tid_subdag = 'file_{}'.format(index)
subdag = create_subdag(dag_parent, tid_subdag, filename)
sd_op = SubDagOperator(task_id=tid_subdag, dag=dag_parent, subdag=subdag)
return sd_op
def create_subdag_operators(dag_parent, file_list):
subdags = [create_subdag_operator(dag_parent, file, file_list.index(file)) for file in file_list]
# chain subdag-operators together
chain(*subdags)
return subdags
check_for_files = BranchPythonOperator(
task_id='Check_for_s3_Files',
provide_context=True,
python_callable=Check_For_Files,
dag=parent_dag
)
finished = DummyOperator(
task_id='finished',
dag=parent_dag
)
decision_to_continue = DummyOperator(
task_id='Start_Process',
dag=parent_dag
)
if len(files) > 0:
subdag_ops = create_subdag_operators(parent_dag, files)
check_for_files >> decision_to_continue >> subdag_ops[0] >> subdag_ops[-1] >> finished
check_for_files >> finished
python
airflow
directed-acyclic-graphs
arcee123
fuente
fuente

sparktrabajos o algúnpythonscript y qué está utilizando para ejecutarlolivyo algún otro métodofilesuna lista vacía?Respuestas:
A continuación se muestra la forma recomendada de crear un DAG dinámico o sub-DAG en el flujo de aire, aunque también hay otras formas, pero supongo que esto se aplicaría en gran medida a su problema.
Primero, cree un archivo
(yaml/csv)que incluya la lista de todos loss3archivos y ubicaciones, en su caso ha escrito una función para almacenarlos en la lista, diría que los almacene en unyamlarchivo separado y cárguelo en tiempo de ejecución en el flujo de aire env y luego cree DAGA continuación se muestra un
yamlarchivo de muestra :dynamicDagConfigFile.yamlPuede modificar su
Check_For_Filesfunción para almacenarlos en unyamlarchivo.Ahora podemos pasar a la creación dinámica de dag:
Primero defina dos tareas utilizando operadores ficticios, es decir, la tarea inicial y la final. Tales tareas son aquellas en las que vamos a construir sobre las nuestras
DAGcreando dinámicamente tareas entre ellas:DAG dinámico: lo usaremos
PythonOperatorsen el flujo de aire. La función debe recibir como argumentos el id de la tarea; una función de python a ejecutar, es decir, python_callable para el operador de Python; y un conjunto de argumentos para ser utilizados durante la ejecución.Incluya un argumento el
task id. Por lo tanto, podemos intercambiar datos entre tareas generadas de forma dinámica, por ejemplo, víaXCOM.Puede especificar su función de operación dentro de este dag dinámico como
s3_to_gcs_op.Finalmente, según la ubicación presente en el archivo yaml, puede crear dags dinámicos, primero lea el
yamlarchivo como se muestra a continuación y cree dag dinámico:Definición final de DAG:
La idea es que
Código de flujo de aire completo en orden:
fuente
upload_s3_toGCSno existirá y se producirá un error en el flujo de aire.yamlarchivo una vez que todos estos archivos se cargan en GCS, de esta manera solo los archivos nuevos estarán presentes en elyamlarchivo. Y en caso de que no haya archivos nuevos, elyamlarchivo estará vacío y no se creará dag dinámico. Es por eso que elyamlarchivo es una opción mucho mejor en comparación con el almacenamiento de archivos en una lista.yamlarchivo también ayudará a mantener el registro de los archivos s3 de una manera, si se supone que parte del archivo s3 no se carga en GCS, también puede mantener un indicador correspondiente a ese archivo y luego volver a intentarlo en la próxima ejecución del DAG.ifcondición antes del DAG que verificaráyamlsi hay archivos nuevos en los archivos, si hay archivos nuevos, ejecútelos; de lo contrario, omítalo.