dev course - DE/TIL

[데브코스] TIL 42일차

nani-jin 2024. 5. 26. 13:43

1. Airflow 설치 방법

  • 클라우드 사용(프로덕션 환경에서 선호됨)

 

 

2. Airflow 코드의 기본 구조

  1. DAG(Airflow에서 ETL을 뜻하는 것) 대표하는 객체를 먼저 만듦
    • 이름, 실행 주기, 실행 날짜, 오너 등
  2. 다음으로 DAG를 구성하는 태스크들을 만듦
    • 태스크 별로 적합한 오퍼레이터 선택
    • 태스크 ID를 부여하고, 해야할 작업의 세부사항 지정
    • 모든 DAG가 스케줄을 가져야하는 것은 아님
  3. 마지막으로, 태스크들간의 실행 순서 결정
## DAG를 대표하는 객체 만들기
from datetime import datetime, timedelta

default_args = {
    'owner':'yejin',
    'email':['***@naver.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=3), # 이 부분에 적용할 수 있는 필드가 많음. 태스크 레벨에서 적용됨
}

## DAG를 구성하는 태스크 만들기
from airflow import DAG

dag = DAG(
    "dag_v1", # DAG name
    start_date = datetime(2020,8,7,hour=0,minute=00), # 과거로 설정하면, 과거부터 현재까지 실행 안된 부분에 대해 catup하려고함
    schedule="0 * * * *",
    tags=["example"],
    catchup=False, # Full refresh인 경우, False로 두는 것이 좋음
    default_args=default_args # common settings # 앞서 지정한 default_args 지정
)

 

 

 

 

3. Airflow 코드 예제 - Bash operator 사용

  • 3개의 태스크로 구성되며,
    • t1 - 현재 시간 출력
    • t2 - 5초간 대기 후 종료
    • t3 - 서버의 /tmp 디렉토리의 내용 출력
  •  t1이 끝난 뒤, t2와 t3 병렬 실행
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

# DAG 대표할 객체 만들기
default_args = {
    'owner':'keeyong',
    'start_date': datetime(2024,05,25,hour=0,minute=00),
    'email':['***@naver.com'],
    'retries':1,
    'retry_delay':timedelta(minutes=3),
}

# 태스크 만들기
test_dag = DAG(
    "dag_v1",
    schedule="0 9 * * *",
    tags=['test'],
    catchUp=False,
    default_args=default_args
)

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=test_dag)
    
t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    dag=test_dag)

t3 = BashOperator(
    task_id='ls',
    bash_command='ls /tmp',
    dat=test_dag)
    
# 순서
t1 >> [t2,t3]

 

  • DAG trigger - 터미널에서 실행
    • 먼저 Airflow 서버 로그인 후 다음 명령 실행
    • airflow dags list
    • airflow tasks list DAG이름
    • airflow tasks test DAG이름 Task이름 날짜 # test vs. run
      • 날짜는 YYYY-MM-DD
      • start_date보다 과거인 경우는 실행되지만, 오늘 날짜보다 미래인 경우 실행 안됨
      • 이 부분이 바로 execution_date의 값이 됨
docker ps # 현재 도커에 띄운 리스트
docker exec -it e1bc7f85eabd sh # 그중 우리는 scheduler id로 실행

airflow dags list # DAG 리스트
airflow tasks list dag_v1 # 태스크 리스트
airflow tasks test dag_v1 ls 2020-08-09 # test vs. run
airflow dags test dag_v1 2019-12-08
airflow dags backfill dag_v1 -s 2019-01-01 -e 2019-12-31