본문 바로가기

languages

[python] 비동기(async)

airflow에서 dag를 짜는데 여러 파일을 s3에서 읽어다가 전처리 후 다시 s3에 적재하는 경우가 많았다. 이 경우 읽고 쓰는데에만 상당한 시간이 걸리는데 파이썬은 GIL(Global Interpreter Lock)때문에 멀티스레딩을 지원하지 않아, 어떻게 이 문제를 풀어나갈 수 있을까 고민하던 찰나에 비동기가 있었지!하고 생각났다

 

비동기란 무엇일까?

 

비동기의 가장 큰 목적은 CPU가 I/O 작업에서 들이는 시간 동안 다른 태스크가 CPU를 사용할 수 있게 해주는 것이다. 본래 동기적으로 코드를 짜면 하나의 태스크가 I/O 작업을 하는 동안 Blocking이 되어 다른 태스크가 CPU를 사용하고 싶어도 사용하지 못하게 되는데, 이때 Blocking을 하지 않고 다른 태스크에게 CPU를 내어줘 아까운 CPU를 더 효율적으로 쓸 수 있게 해준다

 

 

[출처] https://poiemaweb.com/es6-promise

 

 

내가 짠 코드는 다음과 같다.

raw_to_stage라는 하나의 태스크에 s3 raw layer의 raw data 읽어 오기 >> 전처리 >> s3 stage layer에 적재하는 플로우로 이루어져 있으며, 이때 s3에 접근해 파일을 읽고 쓰는 과정에서 비동기를 사용해 태스크가 처리되는 시간을 줄이고자 했다

    async def upload_to_s3(semaphore, s3_client, s3_key_raw, results):
        """
        upload to s3 using s3_client limited by semaphore
        """
        
        col = ['id', 'depAirportCode', 'depCountryCode', 'currencyCode', 'arrAirportCode', 'carrierName', 'depTime', 'arrTime', 'price', 'url', 'extractedDate', 'isExpired']
        df = pd.DataFrame(data=results, columns=col)
            
        parquet_buffer = BytesIO()
        df.to_parquet(parquet_buffer, index=False, engine='pyarrow', use_deprecated_int96_timestamps=True)
        parquet_buffer.seek(0)
            
        s3_key_transformed = s3_key_raw[:-4] + 'parquet'
        s3_bucket_transformed = 'hellokorea-stage-layer'
        
        async with semaphore:
            await s3_client.put_object(
                Bucket = s3_bucket_transformed,
                Key = s3_key_transformed,
                Body = parquet_buffer.getvalue()
            )
        print(f"{s3_key_raw} ended")
           
    async def transform(semaphore, s3_client, s3_key_raw, depAirportCode, depCountryCode, currencyCode, current_date):
        """
        extract data from raw layer
        to store in the stage layer after preprocessing
        """
        response = await s3_client.get_object(
            Bucket='hellokorea-raw-layer',
            Key=s3_key_raw
        )
        file_content = await response['Body'].read()
        df = pd.read_json(StringIO(file_content.decode('utf-8')))

        # preprocessing
        if not df.empty:
            df = pd.json_normalize(df.to_dict(orient='records'))

            tmp_date = datetime.strptime(current_date, '%Y-%m-%d')
            results = []

            print(len(df["id"]), len(df["pricing_options"]))
            for idx, (id, price) in enumerate(zip(df["id"], df["pricing_options"])):
                tmp = list(map(str, id.split('-')))
                depTime = datetime.strptime(f"20{tmp[1][:2]}-{tmp[1][2:4]}-{tmp[1][4:6]} {tmp[1][6:8]}:{tmp[1][8:10]}", "%Y-%m-%d %H:%M")
                carrierCode = '-' + tmp[3]
                carrierName = df.loc[idx][f"_carriers.{carrierCode}.name"]
                arrAirportCode, arrTime = 'ICN', datetime.strptime(f"20{tmp[6][:2]}-{tmp[6][2:4]}-{tmp[6][4:6]} {tmp[6][6:8]}:{tmp[6][8:10]}", "%Y-%m-%d %H:%M")
                cheapest_price = int(round(price[0]["items"][0]["price"]["amount"],0))
                url = price[0]["items"][0]["url"]
                extractedDate = tmp_date
                isExpired = False

                results.append([id, depAirportCode, depCountryCode, currencyCode, arrAirportCode, carrierName, depTime, arrTime, cheapest_price, url, extractedDate, isExpired])
            print(f"{s3_key_raw} started")
            await upload_to_s3(semaphore, s3_client, s3_key_raw, results)
        
    async def extract_from_s3(semaphore, dataset_list, current_date):
    	"""
        extract raw data from s3
        """
        s3_hook = S3Hook(aws_conn_id="s3_conn")
        aws_access_key_id = s3_hook.get_connection(conn_id="s3_conn").login
        aws_secret_access_key = s3_hook.get_connection(conn_id="s3_conn").password
        region_name = "ap-northeast-2"
        
        session = aioboto3.Session()
        async with session.client(
            's3',
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
            region_name=region_name
        ) as s3_client:
            tasks = []
            for depAirportCode, depCountryCode, currencyCode, nowSearchDate, datasetId in dataset_list:
                s3_key_raw = f"source/flight/{depAirportCode}_to_ICN_{nowSearchDate}_updated_{current_date}.json"
                task = asyncio.create_task(transform(semaphore, s3_client, s3_key_raw, depAirportCode, depCountryCode, currencyCode, current_date))
                tasks.append(task)
                    
            await asyncio.gather(*tasks)
        logging.info("transformed done")

    @task
    def raw_to_stage(dataset_list, current_date):
        """
        Get raw data from the raw layer, preprocess it,
        and load it into the stage layer
        """
        semaphore = asyncio.Semaphore(20)
        asyncio.run(extract_from_s3(semaphore, dataset_list, current_date))
        
        return dataset_list

 

비동기 코드를 처음 짜보는 거여서 계속 테스트하며 잘 돌아가는지 확인해야 했지만 다 짜고나니 정말 뿌듯했는데, 위 코드로 기존의 파일 쓰기 작업을 대체한 결과 시간이 10분 소요 → 50초 소요로 단축됐다!

 

[왼] 기존 코드 [오] 비동기 코드

 

 

이번 실험을 통해 비동기 코드를 완성하며, 여러 체크 포인트가 있다는 것을 알게 되었다

✅ semaphore로 사용할 수 있는 요청 수를 제어해야 한다. 일정 갯수를 넘어가면 AWS에서 일시적으로 block 한다

✅ s3와 session 연결 후 오랜 시간이 지나 s3에 파일을 넣으려 하면 오류가 발생한다. 예를 들어, 세션 연결 후 데이터 생성 및 처리 시간이 오래 걸리는 경우(내 기억엔 약 15분 이상 걸리는 경우) 세션 연결이 만료되어 s3에 파일을 쓸 수 없게 된다

✅ 데이터 생성 및 처리에 시간이 너무 많은 시간이 소요되는 경우엔, 비동기보다 멀티프로세싱이 더 적합할 수 있다

 

 

[마무리]

단순히 새로운 기술을 사용하는 것은 올바르지 않다. 오늘은 비동기 코드로 내 상황에 맞게 효율성을 높일 수 있었지만 각 상황에 적합한 해결책이 존재한다는 것을 잊지 않고 기술을 대하길 나 자신에게 기대해본다

'languages' 카테고리의 다른 글

[python] 객체지향 프로그래밍  (0) 2024.04.17
[python] 객체와 인스턴스  (0) 2024.04.17
[python] inheritance, overriding, super()  (0) 2024.04.16
[python] is, ==  (0) 2024.01.27