본문 바로가기

data engineering/spark

Apache Spark intro -2 (RDD. Resilient Distributed Dataset)

https://nani-log.tistory.com/174

 

Apache Spark intro -1 (구성 요소와 아키텍처)

Spark는 Mapreduce를 대체하는 강력한 분산 데이터 처리 시스템이다. Mapreduce는 map → shuffle → reduce의 중간 결과를 디스크에 써 디스크 I/O와 네트워크 오버헤드가 매번 발생했는데, Spark는 중간 결과

nani-log.tistory.com

 

앞선 포스터에선 스파크의 구성 요소와 아키텍처를 살펴봤다. 이번 포스트에선 스파크의 핵심 요소인 RDD(Resilient Distributed Dataset)에 대해 살펴볼 것이다

 

스파크는 사용자가 병렬 분산 환경을 의식하지 않고 처리를 기술할 수 있는 것을 목표로 한다. 이를 위해 하나의 서버에서 처리하기 어려운 대규모 데이터셋을 추상적으로 다룰 수 있는 RDD라는 데이터 구조를 제공한다. 즉, 사용자는 병렬 분산 환경에 대한 복잡한 설정 없이 RDD가 제공하는 API를 통해 데이터를 처리할 수 있으며, 스파크는 이를 자동으로 분산 처리해준다

 

 

1. RDD의 구조

 RDD의 이름을 그대로 해석하면 탄력 있는(또는 복구 가능한) 분산 데이터셋이라는 의미다

 

1) Resilient

탄력 있는, 또는 회복 가능한 이라는 뜻을 가진 이 단어가 RDD에 포함된 이유는 실패 상황에서도 복구 가능한 데이터셋이기 때문이다. 스파크는 RDD의 리니지 정보를 통해 RDD가 어떻게 생성되었는지에 대한 기록을 저장해둔다. 이 때문에 실패 상황에서도 스파크는 해당 데이터를 다시 복구할 수 있다

 

2) Distributed

데이터 분산 처리 기술이기 때문에 RDD 내부는 파티션이라는 단위로 나뉘어져 있다. 예를 들어, HDFS 등의 분산 파일시스템에 있는 파일 내용을 RDD로 로드하고, RDD를 가공하는 방식으로 대량의 데이터를 분산 처리할 수 있다

 

[출처] 아파치 스파크 입문

 

위 그림을 보면, 파티션 단위로 구성된 RDD 구조와 HDFS 등의 분산 파일시스템에서 데이터를 가져와 RDD를 생성하고 이를 변환 & 액션했을 때의 흐름을 파악할 수 있다

 

 

 

2. RDD의 기능과 특성

RDD는 크게 변환과 액션이라는 두 종류의 처리를 적용할 수 있으며, 지연 평가라는 특성을 통해 분산 처리의 최적화를 달성한다

 

1) 변환(Transformation)

변환은 RDD를 가공하고 그 결과 새로운 RDD를 얻는 것을 의미한다. 즉, 변환 전의 RDD가 가지고 있던 요소를 가공하거나 필터링해 새로운 RDD를 생성한다

 

변환은 다시 두 종류로 구분할 수 있는데, 

변환 전의 RDD가 가지는 요소를 같은 RDD의 다른 요소와 관계 없이 처리할 수 있는 종류

  • filter : 요소를 필터링한다
  • map : 각 요소에 동일한 처리를 적용한다
  • flatmap : 각 요소에 동일한 처리를 적용하고 여러 개의 요소를 생성한다
  • zip : 파티션 수가 같고, 파티션에 있는 요소의 수도 같은 두 개의 RDD를 조합해 한쪽 요소를 key로, 다른 한쪽 요소를 value로 갖는 key-value 쌍을 만든다

 

변환 전의 RDD가 가지는 요소를 같은 RDD의 다른 요소와 함께 처리하는 종류

key-value 쌍을 요소로 가지는 RDD를 대상으로 같은 키를 가지는 대상을 모으는 경우를 말한다. 이때 스파크는 파티션 단위로 독립해 분산 처리하기 때문에, 서로 다른 파티션에 있는 같은 키를 가지는 요소의 자리를 바꾸는 셔플(shuffle)이 발생한다. 이때, '파티셔너'가 같은 키를 가진 요소들을 한곳에 모으기 위해 변환 수의 RDD 파티션 수와 각 요소의 키를 기준으로 해당 요소를 어느 파티션에 배치할지 결정한다. 이 과정에서 셔플 파티션이 형성된다

  • reduceByKey : 같은 키를 가지는 요소를 집계(aggregation)한다
  • join : 두 개의 RDD에서 같은 키를 가지는 요소끼리 조인한다

 

2) 액션(Action)

변환이 RDD로부터 다른 RDD를 얻는 '데이터 가공'이라면, 액션은 RDD 내용을 바탕으로 데이터를 가공하지 않고 원하는 결과를 얻는 조작이다

  • saveAsTextFile : RDD 내용을 파일로 출력한다
  • count : RDD 요소 수를 센다
  • collect : 각 파티션으로 나눠진 RDD 요소를 드라이버 프로그램으로 모은다

 

3) 지연 평가(Lazy Evaluation)

RDD는 한 대의 머신에서 전부 다룰 수 없을 만큼 크고 많은 데이터인데, 어떻게 드라이버 프로그램에서 RDD를 생성하거나 변환할 수 있을까? 그 비밀은 RDD의 지연 평가에 있다

 

사용자가 RDD의 API를 활용해 데이터 로드, 변환, 액션을 선언하면 그 동작이 드라이버 내에서 이뤄지는게 아니라 잡 형태로 클러스터에서 실행된다. 즉, 드라이버에서 액션이 호출될 때까지 실행이 '지연'된다

 

예를 들어, 아래와 같이 데이터를 생성하고 각 요소에 변환을 적용하는 코드를 선언했다고 해보자. 액션이 실행되기 전까지 스파크는 실제로 연산을 수행하지 않고, 그 과정들을 스케줄링하는데 그친다. 그러나 액션을 만나면, 비로소 데이터를 처리하고 결과를 산출하기 위해 잡을 클러스터에서 실행한다

val textRDD = sc.textFile("/path/to/huge-text")
val mappedRDD = textRDD.map(text => someFunction(text))
val filteredRDD = textRDD.filter(processedText => filterFunction(processedText))

...

// 실제 RDD에 대한 가공을 실행하는 시점
processedRDD.saveAsTextFile("/path/to/result")

 

 

찬찬히 살펴보다 보니, 이 지연 평가라는 특성이 스파크가 데이터 처리를 최적화하는 지점과 아주 밀접하게 연관되어 있다는 것을 알게 됐다. 우리가 수행하고자 하는 동작을 즉시 수행하지 않고, 각 처리에 필요한 정보들을 모아뒀다가 마지막에 최종 스케줄링 후 실행함으로써, 결과적으로 셔플을 최소화하고 네트워크 I/O와 CPU 연산을 최적화할 수 있다

 

 

4) RDD 영속화(Persisting RDD)

영속화?하면 어떤 의미인지 확 와닿지는 않는다. 영속화란 RDD를 메모리에 저장해 반복적으로 사용하거나 여러 작업에서 RDD를 재사용할 수 있도록 하는 기능이다. 특히 반복적인 작업에서 RDD를 메모리에 저장해두면, 데이터를 로드하고 변환하는 과정이 생략되어 전체 실행 시간이 현저히 줄어든다

 

이 기능은 반복적으로 결과를 다시 모델링하는데 쓰는 머신러닝 작업이나 대규모 데이터 처리에서 특히 유용하다

 

 

 

정리

이번 포스트에선 스파크가 RDD를 통해 어떻게 사용자에게 분산 데이터 처리를 제공할 수 있는지에 대해 살펴봤다. 스파크는 RDD의 분산 컬렉션과 지연 평가라는 특성을 활용해 사용자가 복잡한 내부 구조를 신경쓰지 않고 대규모 데이터를 처리할 수 있게 도와준다. 결국 분산 시스템은 여러 컴퓨터로 구성된 시스템이고, 그 자원을 어떻게 효율적으로 활용하느냐가 핵심이 아닐까 생각된다

 

다음 포스트에서는 RDD의 사용성을 높이기 위해 등장한 Dataframe과 Dataset에 대해 알아보려한다

 

 

 

[출처] 아파치 스파크 입문