data engineering/spark

Apche Spark intro -3 (RDD, DataFrame, Dataset)

nani-jin 2024. 10. 16. 19:41

https://nani-log.tistory.com/175

 

Apache Spark intro -2 (RDD. Resilient Distributed Dataset)

https://nani-log.tistory.com/174 Apache Spark intro -1 (구성 요소와 아키텍처)Spark는 Mapreduce를 대체하는 강력한 분산 데이터 처리 시스템이다. Mapreduce는 map → shuffle → reduce의 중간 결과를 디스크에 써 디스

nani-log.tistory.com

앞선 포스트에선 RDD의 구조와 기능, 특성에 대해 살펴봤다. 이번 포스트에서는 스파크에서 제공하는 RDD, DataFrame, Dataset이라는 데이터 구조를 살펴보고 그 차이를 분석해보려 한다

 

RDD는 변경이 불가한, 회복력 있는 분산 저장된 데이터라고 설명했었다. 그렇다면 DataFrame과 Dataset은 무엇일까?

 

[출처] 프로그래머스 데이터 엔지니어링 데브코스

 

1. Spark SQL 엔진

두 데이터 구조에 대해 설명하기에 앞서 이 둘이 어떤 엔진 위에서 실행되는지 아는게 중요할 것 같다. DataFrame, Dataset은 모두 Spark SQL 엔진 위에서 동작한다. Catalyst optimizer는 이 엔진의 핵심 최적화 도구인데, 사용자가 데이터 프레임이나 데이터셋을 통해 연산을 정의하면 SQL 쿼리와 유사한 방식으로 연산을 논리적/물리적 계획으로 변환하고 이를 최적화한다

 

최적화된 물리적 계획은 결국 RDD API로 변환되어, RDD가 내부적으로 연산을 실행하게 된다. 즉, 스파크의 저수준 API인 RDD는 여전히 최종 연산의 실행 단위로 사용된다

 

RDD와의 큰 차이가 여기에 있다. RDD 또한 최적화를 진행하지만, Catalyst optimizer와 같은 고급 최적화 기능이 없기 때문에, 데이터 프레임과 데이터셋에 비해 성능이 떨어질 수 있다

 

 

[출처] https://medium.com/geekculture/introduction-to-datasets-in-spark-79a7d94d9158

2. DataFrame

데이터 프레임은 RDD와 달리 필드 정보를 가지고 있어 테이블에 쿼리를 날리듯 컬럼을 기반으로 여러 연산 함수를 사용할 수 있다. 다시 말하면, 각 컬럼의 이름과 타입을 포함하는 스키마 정보를 저장해 사용자가 데이터를 SQL 연산과 유사한 방식으로 더 쉽게 처리할 수 있게 해준다

 

다음은 데이터 프레임을 조작하는 예시다

// select 함수
val df = spark.read.option("header", "true").csv("data.csv")
val selectedDf = df.select("name", "age")
selectedDf.show()

// join 함수
val df1 = spark.read.parquet("path/to/file1.parquet")
val df2 = spark.read.parquet("path/to/file2.parquet")

val joinedDf = df1.join(df2, "id") // id 컬럼을 기준으로 내부 조인
joinedDf.show()

 

위 코드에서 볼 수 있듯이 CSV, Json, Parquet 등의 파일 등 다양한 데이터 소스에서 구조화된 데이터를 로드해 데이터 프레임을 만들 수 있다. 또한 select, filter, groupBy, agg와 같은 SQL 스타일의 연산을 제공하기 때문에 이런 연산을 사용하는 경우 유용하다

 

앞서 언급했듯이 데이터 프레임은 Catalyst optimizer에 의해 연산이 최적화되어 성능이 더 좋고, Python/Scala/Java/R 모두에서 지원된다

 

 

3. Dataset

데이터셋은 RDD와 데이터 프레임의 장점을 결합한 것으로 가장 최신에 나온 데이터 구조다. RDD는 Catalyst optimizer를 통한 최적화가 이뤄지지 않아 성능 측면에서의 한계가 있고, 데이터 프레임은 SQL 스타일의 여러 연산을 제공해 데이터 처리를 쉽게 만들어주지만, 데이터 타입에 대한 안정성을 지원하지 않아 컴파일시 오류를 발견하기 어렵다

 

데이터셋은 이 둘의 단점을 보완하는데, RDD의 성능 이슈는 Catalyst optimizer를 통해 최적화하고, 타입 시스템을 통한 타입 안정성을 보장해 컴파일시 타입 오류를 발견할 수 있게 해준다. 또한 JIT 컴파일러와 같은 JVM의 최적화 기능을 활용해 처리 속도를 높일 수 있다. 데이터 프레임도 최적화되긴 하지만, 데이터셋의 이런 특성 덕분에 데이터셋이 더 빠르다

 

타입 안정성에 대해 자세한 예시로 살펴보자. 먼저 타입 안정성을 보장하는 데이터셋을 활용한 예시다. 아래서 볼 수 있듯이 데이터 타입이 명확해 타입을 고려한 연산 수행시 오류가 날 가능성이 없다

// 학생 정보에 대한 데이터 클래스 정의
case class Student(name: String, age: Int, grade: Double)

// 학생 정보를 담고 있는 Seq 생성
val studentSeq = Seq(
	Student("Alice", 20, 88.5),
    Student("Bob", 22, 91.0),
    Student("Charlie", 19, 78.0)
)

// Dataset[Student] 생성
val studentDS: Dataset[Student] = spark.createDataset(studentSeq)

// 타입 안정성을 활용한 특정 연산 수행
// 평균 점수 계산
val averageGrade = studentDS.map(_.grade).reduce(_ + _) / studentDS.count()
println(s"Average Grade: $averageGrade")

// 특정 나이 이상의 학생 필터링
val oldefStudents = studentDS.filter(_.age > 20)

// 출력
studentDS.show()

 

 

다음은 데이터 프레임을 통한 예시다. 아래 코드에서 seq을 데이터 프레임으로 변환할 때, 각 타입에 대해 올바른 데이터 값이 들어왔는지 확인하지 않아 컴파일시 오류가 발생할 수 있다

// 학생 정보에 대한 데이터 클래스 정의
case class Student(name: String, age: Int, grade: Double)

// 학생 정보를 담고 있는 Seq 생성
val studentSeq = Seq(
    Student("Alice", 20, 88.5),
    Student("Bob", 22, 91.0),
    Student("Charlie", 19, 78.0)
)

// dataframe 생성 - 타입 안정성이 부족함 = 즉, 타입에 맞는 데이터가 들어왔는지 확인하지 않음
val studentDF = spark.createDataFrame(studentSeq)

// 평균 점수 계산
val averageGradeDF = studentDF.select(avg("grade")).as[Double].first() // Double 타입으로 반환
println(s"Average Grade: $averageGradeDF")

// 특정 나이 이상의 학생 필터링
val olderStudentsDF = studentDF.filter(col("age") > 20)
olderStudentsDF.show()

 

데이터셋은 Java, Scala에서만 사용할 수 있는데, 그 이유는 Python이 동적 타입 언어라는데 있다. Java, Scala는 정적 타입 언어로 컴파일 시점에 타입 오류를 발견할 수 있지만 Python은 런타임시 타입 체크를 수행하기 때문에 데이터의 오류를 발견할 수 없다. 그래서 Python에서는 제공되지 않는듯 하다

 

결론적으로 데이터셋은 가장 최신에 나온 데이터 구조이니 만큼, RDD와 데이터 프레임의 단점을 보완하고 사용자에게 더 복잡하고 효율적인 처리를 가능하게 해준다

 

 

4. 정리

이번 포스트에서는 RDD, Dataframe, Dataset에 대해 살펴봤다. Dataframe, Dataset 또한 최종적으론 RDD를 기반으로 실행된다. 하지만 사용자에게 더 높은 수준의 추상화된 조작을 제공한다는 것과 Catalyst optimizer를 통한 최적화로 성능을 높이는게 정말 큰 장점인 것 같다. Dataset은 특히 JIT 컴파일러를 통해 더 빠른 성능을 제공하기 때문에 사용을 안할 이유가 없을것 같다

 

스파크에 대해 공부하며 멘토님이 왠만하면 Scala를 배워 사용하는게 좋다 하셨는데, 그 이유가 조금은 이해가 간다. 빅데이터 처리에서 정말 중요한건 무엇보다 데이터 타입이 지켜지는지 등에 대한 데이터 무결성일텐데 Dataset을 사용하면 이러한 장점을 다 가지고 갈 수 있다

 

다음 포스트로는 어떤걸 할지 고민이당

 

 

 

[참고]https://medium.com/@ashwin_kumar_/spark-rdd-vs-dataframe-vs-dataset-c90f7da18e56

 

Spark RDD vs DataFrame vs Dataset

Curious about the differences between Spark RDD, DataFrame, and Dataset? Let’s dive in and explore the complexities of these data…

medium.com