본문 바로가기

troubleshooting

[airflow] task dependency

task decorator로 코드를 야심차게 짜고 딱 코드를 실행하려고 하니, airflow web ui에서 다음과 같은 그래프가 그려졌다. 내가 의도했던 바는 get_high_frequency_airports >> api_call >> raw_to_stage >> data_to_raw >> update_redshift >> unload_redshift_to_s3 >> update_rds 였는데, 왜 이런일이 일어났을까?

 

이 상태의 task 실행 순서를 선언한 코드는 다음과 같았다

def dag():
    ...
    ...
    ...

    current_date = '{{ ds }}'
    search_date = {f'date_{days}': f'{{{{ macros.ds_add(ds, {days}) }}}}' for days in range(1, 31)}
    
    departures = get_high_frequency_airports()
    dataset_list = api_call(departures, search_date)
    
    data_to_raw(dataset_list, current_date)
    raw_to_stage(dataset_list, current_date)
    update_redshift(dataset_list, current_date)
    
    s3_key_to_prod = unload_redshift_to_s3(current_date)
    update_rds(s3_key_to_prod)
    
dag=dag()

 

위 그래프가 그려진 이유를 분석해보니,

  1. api_call로 받은 dataset_list를 data_to_raw, raw_to_stage, update_redshift 모두에 전달하고 있었다. 이로 인해 api_call과 이어진 세 개의 병렬 태스크로 그려졌다
  2. unload_redshift_to_s3는 앞선 태스크에서 받아 넘어오는 값 없이 독립적으로 실행되고 update_rds로 값을 넘겨줬다. 이로 인해 dag 실행시 api_call의 성공 여부와 별개로 독립적인 태스크로 그려졌다

 

내가 원하던 방향성이 아니어서 계속 task가 fail... 되었고 이를 고쳐보고자 방법을 찾았다. 코드로 살펴보면,

    api_call_task = api_call(departures, search_date)
    
    data_to_raw_task = data_to_raw(api_call_task, current_date)
    raw_to_stage_task = raw_to_stage(data_to_raw_task, current_date)
    update_redshift_task = update_redshift(raw_to_stage_task, current_date)
    
    unload_s3_task = unload_redshift_to_s3(update_redshift_task)
    update_rds_task = update_rds(unload_s3_task)
    
    departures >> api_call_task >> data_to_raw_task >> raw_to_stage_task >> update_redshift_task >> unload_s3_task >> update_rds_task

 

모든 태스크에 return을 두고, 다음 태스크로 넘기는 방향을 선택했고 마지막에 순서를 명확히해 task간 종속성을 선언해줬다. 쨔잔

 

 

[마무리]

task 간 종속성은, '어떤 값을 인자로 받아서 넘기는가?'하는 요소가 정말 중요했다. '그냥 순서대로 함수를 선언하면 되는거 아니야?'하며 짰었는데  함수의 인자와 반환값이 무엇인지 명확히 해야 오케스트레이션인 airflow가 잘 실행해 준다는 사실을 깨달았다

 

[추가 주의사항]

너무 큰 데이터를 return(xcom 사용)으로 넘기면 오류가 날 확률이 높다.  xcom의 설계 목적은 애초에 작은 데이터를 주고 받는 것이다. 내가 사용하고 있는 db는 postgres고 xcom의 최대 길이는 1GB(byte 기준)으로 알고 있는데, 이를 넘기면 xcom이 넘겨주지 못한다...  따라서 작고 간단한 데이터를 넘기는 경우가 아니라면, s3와 같은 저장소에 한번 저장하고 그걸 불러와 사용하는 것이 좋다!