본문 바로가기

dev course - DE/TIL

[데브코스] TIL 43일차

1. Airflow 코딩 예제

  • Operators - PythonOperator
from airflow.operators.python import PythonOperator

load_nps = PythonOperator(
    dag=dag,
    task_id='task_id',
    python_callable=python_func,
    params={
        'table':'delighted_nps',
        'schema':'raw_data'
    },
)

# 함수 정의
def python_func(**cxt): # cxt도 딕셔너리 형태
    table = cxt["params"]["table"]
    schema = cxt["params"]["schema"]
    
    ex_date = cxt["execution_date"]
    
    # do what you need to do
    ...
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

dag = DAG(
    dag_id = "helloWorld",
    start_date = datetime(2024,5,5),
    catchup=False,
    tags=['example'],
    schedule='0 2 * * *'
)

def print_hello():
    print("hello!")
    return "hello!"

def print_goodbye():
    print("goodbye!")
    return "goodbye!"

# 각 함수에 태스크 id를 만들어줌
print_hello = PythonOperator(
    task_id = 'print_hello',
    python_callable = print_hello,
    dag = dag)

print_goodbye = PythonOperator(
    task_id = 'print_goodbye',
    python_callable = print_goodbye,
    dag = dag)

# 순서 지정
print_hello >> print_goodbye
  • Task Decorators - 프로그래밍이 더 단순해짐
from airflow.decorators import task

@task
def print_hello():
    print("hello!")
    return "hello!"

@task
def print_goodbye():
    print("goodbye!")
    return "goodbye!"

with DAG(
    dag_id = 'HelloWorld_v2',
    start_date = datetime(2024,5,5),
    catchup = False,
    tags = ['example'],
    schedule = '0 2 * * *'
)as dag:
    
    # 순서 지정
    print_hello() >> print_goodbye() # 함수 이름 자체가 태스크 id가 됨

 

 

 

 

2. 중요한 DAG parameters - DAG 객체를 만들 대 지정해야 하는 것

  • DAG parameters vs. Task parameters 차이점을 잘 이해해야함!
with DAG(
    dag_id = 'HelloWorld_v2',
    start_date = datetime(2024,5,5),
    catchup = False,
    tags = ['example'],
    schedule = '0 2 * * *'
)as dag:
  • max_active_runs : # of DAGs instance
  • max_active_tasks : # of tasks that can run in parallel (upperbound는 현재 airflow에 할당된 CPU 총합)
  • catchup : whether to backfill past runs (incremental update 시에만 중요)

 

 

3. Colab Python 코드를 Airflow로 포팅하기

  • 지난 시간에 짰던 NameGenderCSVtoRedshift.py
## learn-airflow/dags/NameGenderCSVtoRedshift.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

import requests
import logging
import psycopg2

def get_Redshift_connection():
    host = "learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com"
    user = "***"
    password = "***"
    port = 5439
    dbname = "dev"
    conn = psycopg2.connect(f'dbname={dbname} user={user} host={host} password={password} port={port}')
    conn.set_session(autocommit=True)
    return conn.cursor()

def extract(url):
    logging.info("Extract started")
    f = requests.get(url)
    logging.info("Extract dond")
    return (f.text)

def transform(text):
    logging.info("Transform started")
    lines = text.strip().split("\n")[1:]
    records = []
    for l in lines:
        (name, gender) = l.split(",")
        records.append([name, gender])
    logging.info("Transform ended")
    return records
    
def load(records):
    logging.info("load started")
    """
    records = [
      [ "Yejin", "F" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    schema = "***"
    
    # BEGIN, END로 트랜잭션 만들어 주는게 더 좋음
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;")
        # DELETE FROM 먼저 수행. -> FULL REFRESH
        # DELETE FROM을 하면 ROLLBACK 가능. TRUNCATE는 불가능
        
        for r in records:
        	name = r[0]
            gender = r[1]
            print(name,"-",gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;") # or cur.execute("END;")
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")
    logging.info("load done")

def etl():
    link = "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
    data = extract(link)
    lines = transform(data)
    load(lines)
    
dag_second_assignment = DAG(
    dag_id = 'name_gender',
    catchup = False,
    start_date = datetime(2024,5,5),
    schedule = '0 2 * * *')

task = PythonOperator(
    task_id = 'perform_etl',
    python_callable = etl,
    dag = dag_second_assignment)
  • 코드 개선
    • #1. NameGenderCSVtoRedshift_v2.py
      • params를 통해 변수 넘기기
      • execution_date 얻어내기
      • "DELETE FROM" vs. "TRUNCATE"
        • DELETE FROM raw_data.name_gender; -- WHERE 사용 가능
        • TRUNCATE raw_data.name_gender;
    • #2. 
    • #3. NameGenderCSVtoRedshift_v3.py
      • Variable 이용해 CSV parameter 넘기기
      • NameGenderCSVtoRedshift_v3.py#L72
      • Xcom을 사용해 3개의 태스크로 나눠보기
        • 태스크(Operator)들간에 데이터를 주고 받기 위한 방식
        • 보통 한 Operator의 리턴값을 다른 Operator에서 읽어가는 형태
        • 이 값들은 Airflow 메타데이터 DB에 저장이 됨. 따라서 큰 데이터를 주고 받는 데에선 사용 불가(큰 데이터는 S3 등에 로드하고 그 위치를 넘김)
    • #4. NameGenderCSVtoRedshift_v4.py
      • Connections 사용
    • #5. NameGenderCSVtoRedshift_v5.py
      • from airflow.decorators import task
        • task decorator를 사용
        • 이 경우, xcom을 사용할 필요가 없음
        • 기본적으로 PythonOperator 대신에 airflow.decorators.task를 사용함
#4. NameGenderCSVtoRedshift_v4.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook

from datetime import datetime
from datetime import timedelta
# from plugins import slack

import requests
import logging
import psycopg2



def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()


def extract(**context):
    link = context["params"]["url"]
    task_instance = context['task_instance']
    execution_date = context['execution_date']

    logging.info(execution_date)
    f = requests.get(link)
    return (f.text)


def transform(**context):
    logging.info("Transform started")    
    text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
    lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
      records.append([name, gender])
    logging.info("Transform ended")
    return records


def load(**context):
    logging.info("load started")    
    schema = context["params"]["schema"]
    table = context["params"]["table"]
    
    records = context["task_instance"].xcom_pull(key="return_value", task_ids="transform")    
    """
    records = [
      [ "Keeyong", "M" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;") 
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")
        raise
    logging.info("load done")


dag = DAG(
    dag_id = 'name_gender_v4',
    start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 2 * * *',  # 적당히 조절
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
        # 'on_failure_callback': slack.on_failure_callback,
    }
)


extract = PythonOperator(
    task_id = 'extract',
    python_callable = extract,
    params = {
        'url':  Variable.get("csv_url")
    },
    dag = dag)

transform = PythonOperator(
    task_id = 'transform',
    python_callable = transform,
    params = { 
    },  
    dag = dag)

load = PythonOperator(
    task_id = 'load',
    python_callable = load,
    params = {
        'schema': 'keeyong',   ## 자신의 스키마로 변경
        'table': 'name_gender'
    },
    dag = dag)

extract >> transform >> load
#5. NameGenderCSVtoRedshift_v5.py
from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task

from datetime import datetime
from datetime import timedelta

import requests
import logging


def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()


@task
def extract(url):
    logging.info(datetime.utcnow())
    f = requests.get(url)
    return f.text


@task
def transform(text):
    lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
    records = []
    for l in lines:
      (name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
      records.append([name, gender])
    logging.info("Transform ended")
    return records


@task
def load(schema, table, records):
    logging.info("load started")    
    cur = get_Redshift_connection()   
    """
    records = [
      [ "Keeyong", "M" ],
      [ "Claire", "F" ],
      ...
    ]
    """
    # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;") 
        # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")   
    logging.info("load done")


with DAG(
    dag_id='namegender_v5',
    start_date=datetime(2022, 10, 6),  # 날짜가 미래인 경우 실행이 안됨
    schedule='0 2 * * *',  # 적당히 조절
    max_active_runs=1,
    catchup=False,
    default_args={
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
        # 'on_failure_callback': slack.on_failure_callback,
    }
) as dag:

    url = Variable.get("csv_url")
    schema = 'keeyong'   ## 자신의 스키마로 변경
    table = 'name_gender'

    lines = transform(extract(url))
    load(schema, table, lines)

 

 

 

 

4. Connections and Variables

 

 

 

5. Airflow 관련 QnA

  • PostgresHook의 autocommit
    • Default = False
    • 이 경우엔 BEGIN은 아무 영향이 없음. (no-operation)
  • DAG에서 Task를 어느 정도로 분리하는 것이 좋나?
    • Task 과도하게 늘릴 때  → 전체 DAG 실행 시간이 오래 걸리고, 스케줄러에 부하가 감
    • Task 과도하게 줄일 때 → 모듈화가 안되고, 실패시 재실행에 시간이 오래 걸림
    • 당연히 밸런스 있게 조절해야함! 기준은 실패시 재실행되는 시간의 관점에서 task를 나누는 것이 좋음(감각을 익히게 될 것임)
  • Airflow의 Variable 관리 vs. 코드 관리
    • 장점 - 코드 푸시의 필요성이 없음
    • 단점 - 관리나 테스트가 안되어, 사고로 이어질 수 있음(굉장히 중요한 코드라면 variable에 넣지 않는 것이 좋음)

 

 

 

6. Yahoo Finance API DAG 작성 - Full Refresh

 

 

 

 

7. Yahoo Finance API DAG 작성 - Incremental Update

  • 임시 테이블 생성하면서, 현재 테이블의 레코드를 복사 (CREATE TEMP TABLE ... AS SELECT)
  • 임시 테이블로 Yahoo Finance API로 읽어온 레코드를 적재
  • 원본 테이블을 삭제하고 새로 생성
  • 원본 테이블에 임시 테이블의 내용을 복사 (이때, SELECT DISTINCT *를 사용해 중복 제거)

 

 

8. 숙제

 

'dev course - DE > TIL' 카테고리의 다른 글

[데브코스] TIL 44일차  (0) 2024.05.27
[데브코스] TIL 42일차  (0) 2024.05.26
[데브코스] TIL 41일차  (0) 2024.05.21
[데브코스] TIL 35일차  (0) 2024.05.11
[데브코스] TIL 34일차  (0) 2024.05.10