1. Open Weather DAG 구현하기
- Open Weahtermap API
- https://openweathermap.org/price
- 위도/경도를 기반으로 그 지역의 기후 정보를 알려주는 서비스
- 무료 계정으로 API key를 받아, 이를 호출시에 사용함
- 만드려는 DAG : 서울 8일 낮/최소/최대 온도 읽기
- 먼저, Open weathermap에 각자 등록하고 자신의 API key를 다운로드
- API key를 open_weather_api_key라는 variable로 저장
- 서울의 위도 경도 찾기
- One-call API 사용 - 서울의 위도 경도를 이용해 requests 모듈로 호출
- https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&exclude={part}&appid={API key}&units=metric
- 응답 결과에서 온도 정보만 앞으로 7일을 대상으로 출력해볼 것
- 구현 1) Full Refresh
from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task
from datetime import datetime
from datetime import timedelta
import requests
import logging
import json
def get_Redshift_connction():
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
return hook.get_conn().cursor()
@task
def etl(schema, table):
api_key = Variable.get("open_weather_api_key")
# 서울 위도, 경도
lat = 37.5665
lon = 126.9780
# api 연결
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
response = requests.get(url)
data = json.loads(response.text)
ret = []
for d in data["daily"]:
dat = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
ret.append("('{}',{},{},{})".format(day,d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))
cur = get_Redshift_conneciton()
drop_recreate_sql = f"""DROP TABLE IF EXISTS {schema}.{table};
CREATE TABLE {schema}.{table} (
date date,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
"""
insert_sql = f"""INSERT INTO {schema}.{table} VALUES """ + ",".join(ret)
logging.info(drop_recreate_sql)
logging.info(insert_sql)
try:
cur.execute(drop_recreate_sql)
cur.execute(insert_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
with DAG(
dag_id = 'Weather_to_Redshift',
start_date = datetime(2023,5,30), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
) as dag:
etl("kcmclub22", "weather_forecast")
- 구현 2) Incremental Update
from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from datetime import timedelta
import requests
import logging
import json
def get_Redshift_connection():
# autocommit is False by default
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
return hook.get_conn().cursor()
@task
def etl(schema, table, lat, lon, api_key):
# https://openweathermap.org/api/one-call-api
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&exclude={part}&appid={API key}&units=metric"
response = requests.get(url)
data = json.loads(response.text)
ret = []
for d in data["daily"]:
day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))
cur = get_Redshift_connection()
# 원본 테이블이 없을때, 생성
create_table_sql = f"""CREATE TABLE IF NOT EXISTS {schema}.{table} (
date date,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);"""
logging.info(create_table_sql)
# 임시 테이블 생성
create_t_sql = f"""CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};"""
logging.info(create_t_sql)
try:
cur.execute(create_table_sql)
cur.execute(create_t_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
# 임시 테이블에 데이터 입력 # 중복 레코드 존재 가능
insert_sql = f"INSERT INTO t VALUES " + ",".join(ret)
logging_info(insert_sql)
try:
cur.execute(insert_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
# 기존 테이블 대체 # 최신 레코드만 선택해 삽입
alter_sql = f"""DELETE FROM {schema}.{table};
INSERT INTO {schema}.{table}
SELECT date, temp, min_temp, max_temp FROM(
SELECT *, ROW_NUMBER OVER(PARTITION BY date ORDER BY created_date DESC)
FROM t
)
WHERE seq = 1;"""
logging.info(alter_sql)
try:
cur.execute(alter_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
with DAG(
dag_id = 'Weather_to_Redshift_v2'.
start_date = datetime(2024,5,5), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 4 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
) as dag:
etl("kcmclub22", "weather_forecast_v2", 37.5665, 126.9780, Variable.get("open_weather_api_key"))
2. Primary Key uniqueness 보장하기
- ! 빅데이터 기반 데이터 웨어하우스들은 Primary Key를 지켜주지 않음 !
- 왜? 보장하는데 메모리와 시간이 더 들기 때문에 대용량 데이터 적재가 걸림돌이 됨
- 이를 보장하는 것은 데이터 인력의 책임
CREATE TABLE kcmclub22.test(
date date primary key,
value bigint
);
INSERT INTO kcmclub22.test VALUES ('2024-05-05', 100);
INSERT INTO kcmclub22.test VALUES ('2024-05-05', 150); -- 이 작업이 성공해버림...
- 빅데이터 기반 데이터 웨어하우스 사용시, Primary Key 유지 방법 (예시를 통해서 알아보자)
- CREATE TABLE시 최신 정보를 알 수 있는 필드를 기록함
- 새로운 컬럼(seq)를 추가한 뒤 date 별로 created_date 역순으로 ROW_NUMBER를 이용해 순서 구현
- 임시 temp 테이블을 만듦
- 현재 모든 레코드를 복사
- 새로 데이터 소스에서 읽어들인 레코드를 복사(중복 존재 가능)
- 중복을 걸러주는 SQL 작성(최신 레코드를 우선 순위로 선택)
- 위 SQL을 바탕으로 최종 원본 테이블로 복사(원본 테이블 레코드를 삭제하고, 임시 temp 테이블을 원본 테이블로 복사)
-- 날씨 정보이기 때문에, 최신 정보를 더 신뢰할 수 있음
-- 어느 정보가 더 최신 정보인지 created_date 필드에 기록하고 이를 활용
-- date이 같은 레코드들이 있다면, created_date을 기준으로 더 최근 정보를 선택함
CREATE TABLE kcmclub22.weather_forecast(
date date primary key,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
ROW_NUMBER() OVER (partition by date order by created_date DESC) seq
- Upsert
- Primary Key를 기준으로 존재하는 레코드라면, 새 정보로 수정
- 존재하지 않는 레코드라면, 새 레코드로 적재
- 보통 데이터 웨어하우스마다 UPSERT를 효율적으로 해주는 문법을 지원함
- Operator 사용
- 뒤에서 MySQL to Redshift DAG 구현시 사용할 예정
3. Backfill
(예시) 하루에 한번 동작하고, Incremental Update하는 파이프라인이라고 하자. 이틀 전에 실패한 것을 오늘 알았다고 가정했을 때, 실패한 부분을 다시 재실행해야하는 상황이라고 하자
- 정의
- 실패한 데이터 파이프라인을 재실행 또는 읽어온 데이터들의 문제로 다시 읽어와야하는 경우
- 가능하면 Full Refresh를 사용하는 것이 좋음 - 문제가 생겨도 다시 실행하면 됨
- Incremental Update 사용시 - 효율성은 좋지만, 운영/유지보수의 난이도가 올라감
- 실패한 데이터 파이프라인을 재실행 또는 읽어온 데이터들의 문제로 다시 읽어와야하는 경우
- 용이성
- 실패한 파이프라인의 재실행이 얼마나 용이한 구조인가?
- 이는 데이터 엔지니어의 삶에 직접적인 영향을 줌. 운영/유지보수의 난이도.
- 이 backfill이 잘 디자인된 것이 바로 Airflow
- backfill 용이성을 위해서는, 어떻게 ETL을 구현해야할까?
- 시스템적으로 구현
- 날짜별로 backfill 결과를 기록하고 성공 여부를 기록해둠. 나중에 결과 확인이 쉬워짐
- 이 날짜를 시스템에서 ETL 인자로 제공
- 데이터 엔지니어는 읽어와야하는 데이터의 날짜를 계산하지 않고, 시스템이 지정해준 날짜를 사용함
- Airflow의 접근 방식
- ETL 별로 실행 날짜와 결과를 메타데이터 데이터베이스에 기록함
- 모든 DAG 실행에 "execution_date"이 지정되어 있음
- 이를 바탕으로 데이터를 갱신하도록 코드를 작성하면 backfill이 쉬워짐
- 시스템적으로 구현
- 구현 예시 - Daily Incremental Update
- start_date : 시작 날짜라기보다, 처음 읽어와야하는 데이터의 날짜
- execution_date : 읽어와야하는 데이터의 날짜로 설정됨
- (예시) 현재 기준, 총 3번이 실행됨. 11,12,13일
- backfill과 관련된 Airflow 변수
- start_date : DAG가 처음 실행되는 날짜가 아닌, DAG가 처음 읽어와야하는 데이터의 날짜/시간. 실제 첫 실행 날짜(execution_date)는 start_date + DAG의 실행주기
- execution_date : DAG가 읽어와야하는 데이터의 날짜와 시간
- catchup : DAG가 처음 활성화된 시점이 start_date보다 미래라면, 그 사이에 실행이 안된 것들을 어떻게 할 것인지 결정해주는 파라미터. True가 default이며, 이 경우 실행이 안된 것들을 모두 따라 잡으려 하기 때문에 필요한 경우가 아니면 False로 두는 것이 안전함
- end_date : 이 값은 보통 필요하지 않으며, backfill을 날짜 범위에 대해 하는 경우에만 필요함 (airflow dags backfill -s ... -e ...)
+) Airflow 요소를 잘 이해하지 못한 눈물의 사연
- 상황 설명
- 최적화되지 않은 Airflow configuration
- start_date, catchup=True, ...
- 엄청 큰 쿼리
- BigQuery/Snowflake - 가변비용
- 최적화되지 않은 Airflow configuration
- 2천불짜리 쿼리가 Airflow에 의해 8번 스케줄됨 = 1만6천불(약 2,200만원)
- catchup은 기본적으로 True가 default. 이를 catchup=False로 하는 것이 안전
+) airflow - timezone
- airflow.cfg에 두 종류의 타임존 관련 키 존재
- default_timezone
- default_ui_timezone
- start_date, end_date, schedule
- default_timezone에 지정된 타임존을 따름
- execution_date와 로그 시간
- 항상 UTC를 따름
- execution_date를 사용할 때는, 타임존을 고려해 변환 후 사용 필요
+) dags 폴더에서 코딩시 주의할 점
- Airflow는 dags 폴더를 주기적으로 스캔하리 때문에, DAG 모듈이 들어있는 모든 파일들의 메인 함수가 실행됨
- 본의 아니게 개발 중인 테스트 코드도 실행될 수 있음
from airflow import DAG
...
cur.execute("DELETE FROM ...")
'dev course - DE > TIL' 카테고리의 다른 글
[데브코스] TIL 43일차 (0) | 2024.05.27 |
---|---|
[데브코스] TIL 42일차 (0) | 2024.05.26 |
[데브코스] TIL 41일차 (0) | 2024.05.21 |
[데브코스] TIL 35일차 (0) | 2024.05.11 |
[데브코스] TIL 34일차 (0) | 2024.05.10 |