1. Airflow 코딩 예제
- Operators - PythonOperator
from airflow.operators.python import PythonOperator
load_nps = PythonOperator(
dag=dag,
task_id='task_id',
python_callable=python_func,
params={
'table':'delighted_nps',
'schema':'raw_data'
},
)
# 함수 정의
def python_func(**cxt): # cxt도 딕셔너리 형태
table = cxt["params"]["table"]
schema = cxt["params"]["schema"]
ex_date = cxt["execution_date"]
# do what you need to do
...
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
dag = DAG(
dag_id = "helloWorld",
start_date = datetime(2024,5,5),
catchup=False,
tags=['example'],
schedule='0 2 * * *'
)
def print_hello():
print("hello!")
return "hello!"
def print_goodbye():
print("goodbye!")
return "goodbye!"
# 각 함수에 태스크 id를 만들어줌
print_hello = PythonOperator(
task_id = 'print_hello',
python_callable = print_hello,
dag = dag)
print_goodbye = PythonOperator(
task_id = 'print_goodbye',
python_callable = print_goodbye,
dag = dag)
# 순서 지정
print_hello >> print_goodbye
- Task Decorators - 프로그래밍이 더 단순해짐
from airflow.decorators import task
@task
def print_hello():
print("hello!")
return "hello!"
@task
def print_goodbye():
print("goodbye!")
return "goodbye!"
with DAG(
dag_id = 'HelloWorld_v2',
start_date = datetime(2024,5,5),
catchup = False,
tags = ['example'],
schedule = '0 2 * * *'
)as dag:
# 순서 지정
print_hello() >> print_goodbye() # 함수 이름 자체가 태스크 id가 됨
2. 중요한 DAG parameters - DAG 객체를 만들 대 지정해야 하는 것
- DAG parameters vs. Task parameters 차이점을 잘 이해해야함!
with DAG(
dag_id = 'HelloWorld_v2',
start_date = datetime(2024,5,5),
catchup = False,
tags = ['example'],
schedule = '0 2 * * *'
)as dag:
- max_active_runs : # of DAGs instance
- max_active_tasks : # of tasks that can run in parallel (upperbound는 현재 airflow에 할당된 CPU 총합)
- catchup : whether to backfill past runs (incremental update 시에만 중요)
3. Colab Python 코드를 Airflow로 포팅하기
- 지난 시간에 짰던 NameGenderCSVtoRedshift.py
## learn-airflow/dags/NameGenderCSVtoRedshift.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import requests
import logging
import psycopg2
def get_Redshift_connection():
host = "learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com"
user = "***"
password = "***"
port = 5439
dbname = "dev"
conn = psycopg2.connect(f'dbname={dbname} user={user} host={host} password={password} port={port}')
conn.set_session(autocommit=True)
return conn.cursor()
def extract(url):
logging.info("Extract started")
f = requests.get(url)
logging.info("Extract dond")
return (f.text)
def transform(text):
logging.info("Transform started")
lines = text.strip().split("\n")[1:]
records = []
for l in lines:
(name, gender) = l.split(",")
records.append([name, gender])
logging.info("Transform ended")
return records
def load(records):
logging.info("load started")
"""
records = [
[ "Yejin", "F" ],
[ "Claire", "F" ],
...
]
"""
schema = "***"
# BEGIN, END로 트랜잭션 만들어 주는게 더 좋음
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
cur.execute(f"DELETE FROM {schema}.name_gender;")
# DELETE FROM 먼저 수행. -> FULL REFRESH
# DELETE FROM을 하면 ROLLBACK 가능. TRUNCATE는 불가능
for r in records:
name = r[0]
gender = r[1]
print(name,"-",gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
cur.execute(sql)
cur.execute("COMMIT;") # or cur.execute("END;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
logging.info("load done")
def etl():
link = "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
data = extract(link)
lines = transform(data)
load(lines)
dag_second_assignment = DAG(
dag_id = 'name_gender',
catchup = False,
start_date = datetime(2024,5,5),
schedule = '0 2 * * *')
task = PythonOperator(
task_id = 'perform_etl',
python_callable = etl,
dag = dag_second_assignment)
- 코드 개선
- #1. NameGenderCSVtoRedshift_v2.py
- params를 통해 변수 넘기기
- execution_date 얻어내기
- "DELETE FROM" vs. "TRUNCATE"
- DELETE FROM raw_data.name_gender; -- WHERE 사용 가능
- TRUNCATE raw_data.name_gender;
- #2.
- #3. NameGenderCSVtoRedshift_v3.py
- Variable 이용해 CSV parameter 넘기기
- NameGenderCSVtoRedshift_v3.py#L72
- Xcom을 사용해 3개의 태스크로 나눠보기
- 태스크(Operator)들간에 데이터를 주고 받기 위한 방식
- 보통 한 Operator의 리턴값을 다른 Operator에서 읽어가는 형태
- 이 값들은 Airflow 메타데이터 DB에 저장이 됨. 따라서 큰 데이터를 주고 받는 데에선 사용 불가(큰 데이터는 S3 등에 로드하고 그 위치를 넘김)
- #4. NameGenderCSVtoRedshift_v4.py
- Connections 사용
- #5. NameGenderCSVtoRedshift_v5.py
- from airflow.decorators import task
- task decorator를 사용
- 이 경우, xcom을 사용할 필요가 없음
- 기본적으로 PythonOperator 대신에 airflow.decorators.task를 사용함
- from airflow.decorators import task
- #1. NameGenderCSVtoRedshift_v2.py
#4. NameGenderCSVtoRedshift_v4.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from datetime import timedelta
# from plugins import slack
import requests
import logging
import psycopg2
def get_Redshift_connection(autocommit=True):
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
def extract(**context):
link = context["params"]["url"]
task_instance = context['task_instance']
execution_date = context['execution_date']
logging.info(execution_date)
f = requests.get(link)
return (f.text)
def transform(**context):
logging.info("Transform started")
text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
records = []
for l in lines:
(name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
records.append([name, gender])
logging.info("Transform ended")
return records
def load(**context):
logging.info("load started")
schema = context["params"]["schema"]
table = context["params"]["table"]
records = context["task_instance"].xcom_pull(key="return_value", task_ids="transform")
"""
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
"""
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
cur.execute(f"DELETE FROM {schema}.name_gender;")
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
cur.execute(sql)
cur.execute("COMMIT;") # cur.execute("END;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
raise
logging.info("load done")
dag = DAG(
dag_id = 'name_gender_v4',
start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
# 'on_failure_callback': slack.on_failure_callback,
}
)
extract = PythonOperator(
task_id = 'extract',
python_callable = extract,
params = {
'url': Variable.get("csv_url")
},
dag = dag)
transform = PythonOperator(
task_id = 'transform',
python_callable = transform,
params = {
},
dag = dag)
load = PythonOperator(
task_id = 'load',
python_callable = load,
params = {
'schema': 'keeyong', ## 자신의 스키마로 변경
'table': 'name_gender'
},
dag = dag)
extract >> transform >> load
#5. NameGenderCSVtoRedshift_v5.py
from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task
from datetime import datetime
from datetime import timedelta
import requests
import logging
def get_Redshift_connection(autocommit=True):
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
@task
def extract(url):
logging.info(datetime.utcnow())
f = requests.get(url)
return f.text
@task
def transform(text):
lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
records = []
for l in lines:
(name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
records.append([name, gender])
logging.info("Transform ended")
return records
@task
def load(schema, table, records):
logging.info("load started")
cur = get_Redshift_connection()
"""
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
"""
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
try:
cur.execute("BEGIN;")
cur.execute(f"DELETE FROM {schema}.name_gender;")
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
cur.execute(sql)
cur.execute("COMMIT;") # cur.execute("END;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
logging.info("load done")
with DAG(
dag_id='namegender_v5',
start_date=datetime(2022, 10, 6), # 날짜가 미래인 경우 실행이 안됨
schedule='0 2 * * *', # 적당히 조절
max_active_runs=1,
catchup=False,
default_args={
'retries': 1,
'retry_delay': timedelta(minutes=3),
# 'on_failure_callback': slack.on_failure_callback,
}
) as dag:
url = Variable.get("csv_url")
schema = 'keeyong' ## 자신의 스키마로 변경
table = 'name_gender'
lines = transform(extract(url))
load(schema, table, lines)
4. Connections and Variables
5. Airflow 관련 QnA
- PostgresHook의 autocommit
- Default = False
- 이 경우엔 BEGIN은 아무 영향이 없음. (no-operation)
- DAG에서 Task를 어느 정도로 분리하는 것이 좋나?
- Task 과도하게 늘릴 때 → 전체 DAG 실행 시간이 오래 걸리고, 스케줄러에 부하가 감
- Task 과도하게 줄일 때 → 모듈화가 안되고, 실패시 재실행에 시간이 오래 걸림
- 당연히 밸런스 있게 조절해야함! 기준은 실패시 재실행되는 시간의 관점에서 task를 나누는 것이 좋음(감각을 익히게 될 것임)
- Airflow의 Variable 관리 vs. 코드 관리
- 장점 - 코드 푸시의 필요성이 없음
- 단점 - 관리나 테스트가 안되어, 사고로 이어질 수 있음(굉장히 중요한 코드라면 variable에 넣지 않는 것이 좋음)
6. Yahoo Finance API DAG 작성 - Full Refresh
7. Yahoo Finance API DAG 작성 - Incremental Update
- 임시 테이블 생성하면서, 현재 테이블의 레코드를 복사 (CREATE TEMP TABLE ... AS SELECT)
- 임시 테이블로 Yahoo Finance API로 읽어온 레코드를 적재
- 원본 테이블을 삭제하고 새로 생성
- 원본 테이블에 임시 테이블의 내용을 복사 (이때, SELECT DISTINCT *를 사용해 중복 제거)
8. 숙제
'dev course - DE > TIL' 카테고리의 다른 글
[데브코스] TIL 44일차 (0) | 2024.05.27 |
---|---|
[데브코스] TIL 42일차 (0) | 2024.05.26 |
[데브코스] TIL 41일차 (0) | 2024.05.21 |
[데브코스] TIL 35일차 (0) | 2024.05.11 |
[데브코스] TIL 34일차 (0) | 2024.05.10 |