airflow에서 dag를 짜는데 여러 파일을 s3에서 읽어다가 전처리 후 다시 s3에 적재하는 경우가 많았다. 이 경우 읽고 쓰는데에만 상당한 시간이 걸리는데 파이썬은 GIL(Global Interpreter Lock)때문에 멀티스레딩을 지원하지 않아, 어떻게 이 문제를 풀어나갈 수 있을까 고민하던 찰나에 비동기가 있었지!하고 생각났다
비동기란 무엇일까?
비동기의 가장 큰 목적은 CPU가 I/O 작업에서 들이는 시간 동안 다른 태스크가 CPU를 사용할 수 있게 해주는 것이다. 본래 동기적으로 코드를 짜면 하나의 태스크가 I/O 작업을 하는 동안 Blocking이 되어 다른 태스크가 CPU를 사용하고 싶어도 사용하지 못하게 되는데, 이때 Blocking을 하지 않고 다른 태스크에게 CPU를 내어줘 아까운 CPU를 더 효율적으로 쓸 수 있게 해준다
내가 짠 코드는 다음과 같다.
raw_to_stage라는 하나의 태스크에 s3 raw layer의 raw data 읽어 오기 >> 전처리 >> s3 stage layer에 적재하는 플로우로 이루어져 있으며, 이때 s3에 접근해 파일을 읽고 쓰는 과정에서 비동기를 사용해 태스크가 처리되는 시간을 줄이고자 했다
async def upload_to_s3(semaphore, s3_client, s3_key_raw, results):
"""
upload to s3 using s3_client limited by semaphore
"""
col = ['id', 'depAirportCode', 'depCountryCode', 'currencyCode', 'arrAirportCode', 'carrierName', 'depTime', 'arrTime', 'price', 'url', 'extractedDate', 'isExpired']
df = pd.DataFrame(data=results, columns=col)
parquet_buffer = BytesIO()
df.to_parquet(parquet_buffer, index=False, engine='pyarrow', use_deprecated_int96_timestamps=True)
parquet_buffer.seek(0)
s3_key_transformed = s3_key_raw[:-4] + 'parquet'
s3_bucket_transformed = 'hellokorea-stage-layer'
async with semaphore:
await s3_client.put_object(
Bucket = s3_bucket_transformed,
Key = s3_key_transformed,
Body = parquet_buffer.getvalue()
)
print(f"{s3_key_raw} ended")
async def transform(semaphore, s3_client, s3_key_raw, depAirportCode, depCountryCode, currencyCode, current_date):
"""
extract data from raw layer
to store in the stage layer after preprocessing
"""
response = await s3_client.get_object(
Bucket='hellokorea-raw-layer',
Key=s3_key_raw
)
file_content = await response['Body'].read()
df = pd.read_json(StringIO(file_content.decode('utf-8')))
# preprocessing
if not df.empty:
df = pd.json_normalize(df.to_dict(orient='records'))
tmp_date = datetime.strptime(current_date, '%Y-%m-%d')
results = []
print(len(df["id"]), len(df["pricing_options"]))
for idx, (id, price) in enumerate(zip(df["id"], df["pricing_options"])):
tmp = list(map(str, id.split('-')))
depTime = datetime.strptime(f"20{tmp[1][:2]}-{tmp[1][2:4]}-{tmp[1][4:6]} {tmp[1][6:8]}:{tmp[1][8:10]}", "%Y-%m-%d %H:%M")
carrierCode = '-' + tmp[3]
carrierName = df.loc[idx][f"_carriers.{carrierCode}.name"]
arrAirportCode, arrTime = 'ICN', datetime.strptime(f"20{tmp[6][:2]}-{tmp[6][2:4]}-{tmp[6][4:6]} {tmp[6][6:8]}:{tmp[6][8:10]}", "%Y-%m-%d %H:%M")
cheapest_price = int(round(price[0]["items"][0]["price"]["amount"],0))
url = price[0]["items"][0]["url"]
extractedDate = tmp_date
isExpired = False
results.append([id, depAirportCode, depCountryCode, currencyCode, arrAirportCode, carrierName, depTime, arrTime, cheapest_price, url, extractedDate, isExpired])
print(f"{s3_key_raw} started")
await upload_to_s3(semaphore, s3_client, s3_key_raw, results)
async def extract_from_s3(semaphore, dataset_list, current_date):
"""
extract raw data from s3
"""
s3_hook = S3Hook(aws_conn_id="s3_conn")
aws_access_key_id = s3_hook.get_connection(conn_id="s3_conn").login
aws_secret_access_key = s3_hook.get_connection(conn_id="s3_conn").password
region_name = "ap-northeast-2"
session = aioboto3.Session()
async with session.client(
's3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name
) as s3_client:
tasks = []
for depAirportCode, depCountryCode, currencyCode, nowSearchDate, datasetId in dataset_list:
s3_key_raw = f"source/flight/{depAirportCode}_to_ICN_{nowSearchDate}_updated_{current_date}.json"
task = asyncio.create_task(transform(semaphore, s3_client, s3_key_raw, depAirportCode, depCountryCode, currencyCode, current_date))
tasks.append(task)
await asyncio.gather(*tasks)
logging.info("transformed done")
@task
def raw_to_stage(dataset_list, current_date):
"""
Get raw data from the raw layer, preprocess it,
and load it into the stage layer
"""
semaphore = asyncio.Semaphore(20)
asyncio.run(extract_from_s3(semaphore, dataset_list, current_date))
return dataset_list
비동기 코드를 처음 짜보는 거여서 계속 테스트하며 잘 돌아가는지 확인해야 했지만 다 짜고나니 정말 뿌듯했는데, 위 코드로 기존의 파일 쓰기 작업을 대체한 결과 시간이 10분 소요 → 50초 소요로 단축됐다!
이번 실험을 통해 비동기 코드를 완성하며, 여러 체크 포인트가 있다는 것을 알게 되었다
✅ semaphore로 사용할 수 있는 요청 수를 제어해야 한다. 일정 갯수를 넘어가면 AWS에서 일시적으로 block 한다
✅ s3와 session 연결 후 오랜 시간이 지나 s3에 파일을 넣으려 하면 오류가 발생한다. 예를 들어, 세션 연결 후 데이터 생성 및 처리 시간이 오래 걸리는 경우(내 기억엔 약 15분 이상 걸리는 경우) 세션 연결이 만료되어 s3에 파일을 쓸 수 없게 된다
✅ 데이터 생성 및 처리에 시간이 너무 많은 시간이 소요되는 경우엔, 비동기보다 멀티프로세싱이 더 적합할 수 있다
[마무리]
단순히 새로운 기술을 사용하는 것은 올바르지 않다. 오늘은 비동기 코드로 내 상황에 맞게 효율성을 높일 수 있었지만 각 상황에 적합한 해결책이 존재한다는 것을 잊지 않고 기술을 대하길 나 자신에게 기대해본다
'languages' 카테고리의 다른 글
[python] 객체지향 프로그래밍 (0) | 2024.04.17 |
---|---|
[python] 객체와 인스턴스 (0) | 2024.04.17 |
[python] inheritance, overriding, super() (0) | 2024.04.16 |
[python] is, == (0) | 2024.01.27 |