dev course - DE/TIL
[데브코스] TIL 42일차
nani-jin
2024. 5. 26. 13:43
1. Airflow 설치 방법
- 직접 설치 후 운영
- 개인 랩탑에 Docker 설치 후 Airflow 설치
- AWS EC2 등의 리눅스 서버에 직접 설치 - 우분투 20.04 사용
- 클라우드 사용(프로덕션 환경에서 선호됨)
2. Airflow 코드의 기본 구조
- DAG(Airflow에서 ETL을 뜻하는 것) 대표하는 객체를 먼저 만듦
- 이름, 실행 주기, 실행 날짜, 오너 등
- 다음으로 DAG를 구성하는 태스크들을 만듦
- 태스크 별로 적합한 오퍼레이터 선택
- 태스크 ID를 부여하고, 해야할 작업의 세부사항 지정
- 모든 DAG가 스케줄을 가져야하는 것은 아님
- 마지막으로, 태스크들간의 실행 순서 결정
## DAG를 대표하는 객체 만들기
from datetime import datetime, timedelta
default_args = {
'owner':'yejin',
'email':['***@naver.com'],
'retries': 1,
'retry_delay': timedelta(minutes=3), # 이 부분에 적용할 수 있는 필드가 많음. 태스크 레벨에서 적용됨
}
## DAG를 구성하는 태스크 만들기
from airflow import DAG
dag = DAG(
"dag_v1", # DAG name
start_date = datetime(2020,8,7,hour=0,minute=00), # 과거로 설정하면, 과거부터 현재까지 실행 안된 부분에 대해 catup하려고함
schedule="0 * * * *",
tags=["example"],
catchup=False, # Full refresh인 경우, False로 두는 것이 좋음
default_args=default_args # common settings # 앞서 지정한 default_args 지정
)
3. Airflow 코드 예제 - Bash operator 사용
- 3개의 태스크로 구성되며,
- t1 - 현재 시간 출력
- t2 - 5초간 대기 후 종료
- t3 - 서버의 /tmp 디렉토리의 내용 출력
- t1이 끝난 뒤, t2와 t3 병렬 실행
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
# DAG 대표할 객체 만들기
default_args = {
'owner':'keeyong',
'start_date': datetime(2024,05,25,hour=0,minute=00),
'email':['***@naver.com'],
'retries':1,
'retry_delay':timedelta(minutes=3),
}
# 태스크 만들기
test_dag = DAG(
"dag_v1",
schedule="0 9 * * *",
tags=['test'],
catchUp=False,
default_args=default_args
)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=test_dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
dag=test_dag)
t3 = BashOperator(
task_id='ls',
bash_command='ls /tmp',
dat=test_dag)
# 순서
t1 >> [t2,t3]
- DAG trigger - 터미널에서 실행
- 먼저 Airflow 서버 로그인 후 다음 명령 실행
- airflow dags list
- airflow tasks list DAG이름
- airflow tasks test DAG이름 Task이름 날짜 # test vs. run
- 날짜는 YYYY-MM-DD
- start_date보다 과거인 경우는 실행되지만, 오늘 날짜보다 미래인 경우 실행 안됨
- 이 부분이 바로 execution_date의 값이 됨
docker ps # 현재 도커에 띄운 리스트
docker exec -it e1bc7f85eabd sh # 그중 우리는 scheduler id로 실행
airflow dags list # DAG 리스트
airflow tasks list dag_v1 # 태스크 리스트
airflow tasks test dag_v1 ls 2020-08-09 # test vs. run
airflow dags test dag_v1 2019-12-08
airflow dags backfill dag_v1 -s 2019-01-01 -e 2019-12-31