tratando de crear subdags dinámicos a partir de datos primarios basados ​​en una matriz de nombres de archivo

10

Estoy tratando de mover archivos s3 de un depósito "sin eliminación" (lo que significa que no puedo eliminar los archivos) a GCS usando el flujo de aire. No puedo garantizar que haya nuevos archivos todos los días, pero debo buscar nuevos archivos todos los días.

Mi problema es la creación dinámica de subdags. Si hay archivos, necesito subdags. Si NO hay archivos, no necesito subdags. Mi problema es la configuración aguas arriba / aguas abajo. En mi código, detecta archivos, pero no inicia los subdags como se supone que deben hacerlo. Me falta algo

Aquí está mi código:

from airflow import models
from  airflow.utils.helpers import chain
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.contrib.operators.s3_to_gcs_operator import S3ToGoogleCloudStorageOperator
from airflow.utils import dates
from airflow.models import Variable
import logging

args = {
    'owner': 'Airflow',
    'start_date': dates.days_ago(1),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_success': True,
}

bucket = 'mybucket'
prefix = 'myprefix/'
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = []

parent_dag = models.DAG(
    dag_id='My_Ingestion',
    default_args=args,
    schedule_interval='@daily',
    catchup=False
)

def Check_For_Files(**kwargs):
    s3 = S3Hook(aws_conn_id='S3_BOX')
    s3.get_conn()
    bucket = bucket
    LastBDEXDate = int(Variable.get("last_publish_date"))
    maxdate = LastBDEXDate
    files = s3.list_keys(bucket_name=bucket, prefix='myprefix/file')
    for file in files:
        print(file)
        print(file.split("_")[-2])
        print(file.split("_")[-2][-8:])  ##proves I can see a date in the file name is ok.
        maxdate = maxdate if maxdate > int(file.split("_")[-2][-8:]) else int(file.split("_")[-2][-8:])
    if maxdate > LastBDEXDate:
        return 'Start_Process'
    return 'finished'

def create_subdag(dag_parent, dag_id_child_prefix, file_name):
    # dag params
    dag_id_child = '%s.%s' % (dag_parent.dag_id, dag_id_child_prefix)

    # dag
    subdag = models.DAG(dag_id=dag_id_child,
              default_args=args,
              schedule_interval=None)

    # operators
    s3_to_gcs_op = S3ToGoogleCloudStorageOperator(
        task_id=dag_id_child,
        bucket=bucket,
        prefix=file_name,
        dest_gcs_conn_id='GCP_Account',
        dest_gcs='gs://my_files/To_Process/',
        replace=False,
        gzip=True,
        dag=subdag)


    return subdag

def create_subdag_operator(dag_parent, filename, index):
    tid_subdag = 'file_{}'.format(index)
    subdag = create_subdag(dag_parent, tid_subdag, filename)
    sd_op = SubDagOperator(task_id=tid_subdag, dag=dag_parent, subdag=subdag)
    return sd_op

def create_subdag_operators(dag_parent, file_list):
    subdags = [create_subdag_operator(dag_parent, file, file_list.index(file)) for file in file_list]
    # chain subdag-operators together
    chain(*subdags)
    return subdags

check_for_files = BranchPythonOperator(
    task_id='Check_for_s3_Files',
    provide_context=True,
    python_callable=Check_For_Files,
    dag=parent_dag
)

finished = DummyOperator(
    task_id='finished',
    dag=parent_dag
)

decision_to_continue = DummyOperator(
    task_id='Start_Process',
    dag=parent_dag
)

if len(files) > 0:
    subdag_ops = create_subdag_operators(parent_dag, files)
    check_for_files >> decision_to_continue >> subdag_ops[0] >> subdag_ops[-1] >> finished


check_for_files >> finished
arcee123
fuente
Qué tipo de trabajo se ejecuta en el backend de estos DAGS son estos sparktrabajos o algún pythonscript y qué está utilizando para ejecutarlo livyo algún otro método
ashwin agrawal
Lo siento, no entiendo la pregunta. ¿puedes por favor repetir?
arcee123
Quiero decir que solo estás usando scripts de Python simples y no estás usando ningún trabajo de chispa, ¿verdad?
Ashwin Agrawal
Si. operadores simples que son predeterminados en el flujo de aire. Quiero agregar operadores existentes a una velocidad dinámica basada en archivos marcados en S3 que quiero incorporar a GCS.
arcee123
¿Por qué hay filesuna lista vacía?
Oluwafemi Sule

Respuestas:

3

A continuación se muestra la forma recomendada de crear un DAG dinámico o sub-DAG en el flujo de aire, aunque también hay otras formas, pero supongo que esto se aplicaría en gran medida a su problema.

Primero, cree un archivo (yaml/csv)que incluya la lista de todos los s3archivos y ubicaciones, en su caso ha escrito una función para almacenarlos en la lista, diría que los almacene en un yamlarchivo separado y cárguelo en tiempo de ejecución en el flujo de aire env y luego cree DAG

A continuación se muestra un yamlarchivo de muestra : dynamicDagConfigFile.yaml

job: dynamic-dag
bucket_name: 'bucket-name'
prefix: 'bucket-prefix'
S3Files:
    - File1: 'S3Loc1'
    - File2: 'S3Loc2'
    - File3: 'S3Loc3'

Puede modificar su Check_For_Filesfunción para almacenarlos en un yamlarchivo.

Ahora podemos pasar a la creación dinámica de dag:

Primero defina dos tareas utilizando operadores ficticios, es decir, la tarea inicial y la final. Tales tareas son aquellas en las que vamos a construir sobre las nuestras DAGcreando dinámicamente tareas entre ellas:

start = DummyOperator(
    task_id='start',
    dag=dag
)

end = DummyOperator(
    task_id='end',
    dag=dag)

DAG dinámico: lo usaremos PythonOperatorsen el flujo de aire. La función debe recibir como argumentos el id de la tarea; una función de python a ejecutar, es decir, python_callable para el operador de Python; y un conjunto de argumentos para ser utilizados durante la ejecución.

Incluya un argumento el task id. Por lo tanto, podemos intercambiar datos entre tareas generadas de forma dinámica, por ejemplo, vía XCOM.

Puede especificar su función de operación dentro de este dag dinámico como s3_to_gcs_op.

def createDynamicDAG(task_id, callableFunction, args):
    task = PythonOperator(
        task_id = task_id,
        provide_context=True,
        #Eval is used since the callableFunction var is of type string
        #while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable = eval(callableFunction),
        op_kwargs = args,
        xcom_push = True,
        dag = dag,
    )
    return task

Finalmente, según la ubicación presente en el archivo yaml, puede crear dags dinámicos, primero lea el yamlarchivo como se muestra a continuación y cree dag dinámico:

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    # use safe_load instead to load the YAML file
    configFile = yaml.safe_load(f)

    #Extract file list
    S3Files = configFile['S3Files']

    #In this loop tasks are created for each table defined in the YAML file
    for S3File in S3Files:
        for S3File, fieldName in S3File.items():

            #Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_s3_files = createDynamicDAG('{}-getS3Data'.format(S3File), 
                                            'getS3Data', 
                                            {}) #your configs here.

            #Second step is upload S3 to GCS
            upload_s3_toGCS = createDynamicDAG('{}-uploadDataS3ToGCS'.format(S3File), 'uploadDataS3ToGCS', {'previous_task_id':'{}-'})

#write your configs again here like S3 bucket name prefix extra or read from yaml file, and other GCS config.

Definición final de DAG:

La idea es que

#once tasks are generated they should linked with the
#dummy operators generated in the start and end tasks. 
start >> get_s3_files
get_s3_files >> upload_s3_toGCS
upload_s3_toGCS >> end

Código de flujo de aire completo en orden:

import yaml
import airflow
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator

start = DummyOperator(
    task_id='start',
    dag=dag
)


def createDynamicDAG(task_id, callableFunction, args):
    task = PythonOperator(
        task_id = task_id,
        provide_context=True,
        #Eval is used since the callableFunction var is of type string
        #while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable = eval(callableFunction),
        op_kwargs = args,
        xcom_push = True,
        dag = dag,
    )
    return task


end = DummyOperator(
    task_id='end',
    dag=dag)



with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    configFile = yaml.safe_load(f)

    #Extract file list
    S3Files = configFile['S3Files']

    #In this loop tasks are created for each table defined in the YAML file
    for S3File in S3Files:
        for S3File, fieldName in S3File.items():

            #Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_s3_files = createDynamicDAG('{}-getS3Data'.format(S3File), 
                                            'getS3Data', 
                                            {}) #your configs here.

            #Second step is upload S3 to GCS
            upload_s3_toGCS = createDynamicDAG('{}-uploadDataS3ToGCS'.format(S3File), 'uploadDataS3ToGCS', {'previous_task_id':'{}-'})

#write your configs again here like S3 bucket name prefix extra or read from yaml file, and other GCS config.


start >> get_s3_files
get_s3_files >> upload_s3_toGCS
upload_s3_toGCS >> end
ashwin agrawal
fuente
Muchas gracias. Entonces, ¿uno de los problemas que tuve fue qué sucede si no hay archivos nuevos? Uno de los problemas que enfrento es que siempre habrá archivos en este lugar, pero no se garantiza que NUEVOS archivos se extraigan, lo que significa que la sección upload_s3_toGCSno existirá y se producirá un error en el flujo de aire.
arcee123
Puede resolver el problema eliminando los archivos del yamlarchivo una vez que todos estos archivos se cargan en GCS, de esta manera solo los archivos nuevos estarán presentes en el yamlarchivo. Y en caso de que no haya archivos nuevos, el yamlarchivo estará vacío y no se creará dag dinámico. Es por eso que el yamlarchivo es una opción mucho mejor en comparación con el almacenamiento de archivos en una lista.
Ashwin Agrawal
El yamlarchivo también ayudará a mantener el registro de los archivos s3 de una manera, si se supone que parte del archivo s3 no se carga en GCS, también puede mantener un indicador correspondiente a ese archivo y luego volver a intentarlo en la próxima ejecución del DAG.
Ashwin Agrawal
Y si no hay archivos nuevos, puede poner una ifcondición antes del DAG que verificará yamlsi hay archivos nuevos en los archivos, si hay archivos nuevos, ejecútelos; de lo contrario, omítalo.
Ashwin Agrawal
El problema aquí es que se establecen aguas abajo. si las aguas abajo se configuran sin los trabajos reales (porque no existen archivos), se producirá un error.
arcee123