Python Operator과 Task Decorator의 차이
- 버전 차이
- Python Operator - Airflow 1.x부터 사용 가능한 전통적인 방식
- Task Decorator - Airflow 2.0에서 도입된 기능으로, 최신 기능과의 호환성을 고려할 때 유용한 방식
- 코드 구조의 차이
- Python Operator - 함수를 정의한 후 'PythonOperator'을 통해 해당 함수를 실행해야함
- Task Decorator - 함수 정의에 바로 @Task(Decorator)을 사용해 생성하므로, 함수 정의 + Task 생성이 하나의 블록으로 결합됨
[예제 소개] NameGender csv 파일을 Redshift로 옮기는 코드
1. Python Operator로 구현
1-1) 라이브러리 소개
- DAG
- PythonOperator
- Python callables를 호출함
- callables란 호출 가능한 클래스 인스턴스, 함수, 메서드 등의 객체를 의미
- Variable
- Airflow 전역에서 사용할 수 있는 값을 미리 저장해두고, DAG에서 공통적으로 사용하는 변수
- webserver UI에서 쉽게 설정 가능하며, key-value방식으로 구성됨
- PostgresHook
- Postgres와 연결
from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
1-2) DAG를 대표하는 객체를 만듦
- 이름, 실행 주기, 실행 날짜, 오너 등
- 실행 주기
- Unix 기반 OS 스케줄러인 cron 구문을 지원함
- * * * * *
- minute(0-59), hour(0-23), day of month(1-31), month(1-12), day of week(0-6. 0: sun)
- 실행 주기
dag = DAG(
dag_id = 'name_gender_v4',
start_date = datetime(2024,5,5), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *',
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
1-3) Redshift와 연결
def get_Redshift_connection(autocommit=True):
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
1-4) DAG를 구성하는 태스크 함수를 정의
- Extract
def extract(**context):
link = context["params"]["url"]
task_instance = context['task_instance']
execution_date = context['execution_date']
logging.info(execution_date)
f = requests.get(link)
return (f.text)
- Transform
- xcom
- 태스크(Operator) 간에 데이터를 전달하기 위해 사용됨
- 보통 한 Operator의 리턴값을 다른 Operator에서 읽어가는 형태
- 이 값들은 Airflow 메타데이터 DB에 저장이 됨. 따라서 큰 데이터를 주고 받는 데에선 사용 불가(큰 데이터는 S3 등에 로드하고 그 위치를 넘김)
- xcom
def transform(**context):
logging.info("Transform started")
text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
lines = text.strip().split("\n")[:1]
records = []
for l in lines:
(name, gender) = l.split(",")
records.append([name, gender])
logging.info("Transform ended")
return records
- Load
def load(**context):
logging.info("load started")
schema = context["params"]["schema"]
table = context["params"]["table"]
"""
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
"""
records = context["task_instance"].xcom_pull(key="return_value", taskids="transform")
# BEGIN, END로 트랜잭션
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
cur.execute(f"DELETE FROM {schema}.name_gender;")
# DELETE FROM 먼저 수행. FULL REFRESH 형태
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
cur.execute(sql)
cur.execute("COMMIT;") # cur.execute("END;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
raise # 꼭 ! 안하면 오류 발생시 모름
logging.info("load done")
- ETL을 각각 태스크로 만들고 순서를 결정함
extract = PythonOperator(
task_id = 'extract',
python_callable = extract,
params = {
'url': Variable.get("csv_url")
},
dag=dag)
transform = PythonOperator(
task_id = 'transform',
python_callable = transform,
params = {
},
dag=dag)
load = PythonOperator(
task_id = 'load',
python_callable = load,
params = {
'schema': '', # 자신의 스키마 이름
'table': 'name_gender'
},
dag=dag)
extract >> transform >> load
2. Task Decorator로 구현 - 일반적으로 더 많이 사용함
2-1) 라이브러리 소개
- Task Decorator
- 프로그래밍이 더 단순해짐
- 데코레이터? 함수 앞뒤에 기능을 추가해 손쉽게 함수를 활용하는 기법
from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
2-2) DAG를 대표하는 객체를 만들고, 태스크 간의 순서를 동시에 결정함
- 이름, 실행 주기, 실행 날짜, 오너 등
- 실행 주기
- Unix 기반 OS 스케줄러인 cron 구문을 지원함
- * * * * *
- minute(0-59), hour(0-23), day of month(1-31), month(1-12), day of week(0-6. 0: sun)
- 실행 주기
with DAG(
dag_id='namegender_v5',
start_date=datetime(2022, 10, 6), # 날짜가 미래인 경우 실행이 안됨
schedule='0 2 * * *', # 적당히 조절
max_active_runs=1,
catchup=False,
default_args={
'retries': 1,
'retry_delay': timedelta(minutes=3),
# 'on_failure_callback': slack.on_failure_callback,
}
) as dag:
url = Variable.get("csv_url")
schema = '', # 자신의 스키마 이름
table = 'name_gender'
lines = transform(extract(url))
load(schema, table, lines)
2-3) Redshift와 연결
def get_Redshift_connection(autocommit=True):
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
2-4) DAG를 구성하는 태스크들을 만듦. 이때 데코레이터를 사용함으로 함수를 정의하는 동시에 태스크 생성
- Extract
@task
def extract(url):
logging.info(datetime.utcnow())
f = requests.get(url)
return f.text
- Transform
@task
def transform(text):
lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
records = []
for l in lines:
(name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
records.append([name, gender])
logging.info("Transform ended")
return records
- Load
@task
def load(schema, table, records):
logging.info("load started")
cur = get_Redshift_connection()
"""
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
"""
# BEGIN, END로 트랜잭션
try:
cur.execute("BEGIN;")
cur.execute(f"DELETE FROM {schema}.name_gender;")
# DELETE FROM을 먼저 수행. FULL REFRESH을 하는 형태
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
cur.execute(sql)
cur.execute("COMMIT;") # cur.execute("END;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
logging.info("load done")
'data engineering > airflow' 카테고리의 다른 글
[Airflow] 로그 파일 관리 (0) | 2024.06.17 |
---|---|
[Airflow] DAG의 기본 구조 (0) | 2024.06.03 |
[Airflow] Airflow는 왜 등장했을까? + 구성 요소 (0) | 2024.06.03 |