2.1.7 Spark Cache
이번 챕터에서는 Spark 데이터 가공을 빠르고 효율적으로 하기 위한 Cache 기능에 대해 사례를 통해 살펴봅니다.
이번시간에 사용할 데이터셋은 Kaggle - Denver AirBNB 입니다. 파일을 다운받은 후 다음과 같이 이름을 변경합니다.
calendar.csv -> airbnb_calendar.csv
listing.csv -> airbnb_listings.csv
neighbourhoods.csv -> airbnb_neighbourhoods.csv
reviews.csv -> airbnb_reviews.csvCache 사용 방법
지난 챕터들에서 Spark 는 '논리적' 인 수준에서 데이터를 파티션 단위로 나누어 처리한다고 이야기를 나누었습니다. 다시 그림으로 살펴보면 아래와 같습니다.


여기서 Worker Node 는 EC2 와 같은 개별 머신입니다. 하나의 EC2 에 여러개의 Executor (JVM) 을 실행할 수도 있습니다.
개별 Executor 내의 설정에 의해 허용된 메모리량 만큼 Partition 데이터를 캐싱하는것이 가능하며, BlockManager 에 의해 관리됩니다. 만약 데이터가 메모리에 들어가기 충분하지 않으면 Disk 를 사용할 수 있습니다.
지난시간에 만들었던 airbnb_listing_parquet (200개 파티션) 디렉토리를 읽어서 cache() 함수를 통해 메모리에 데이터를 올려보겠습니다.
만약 해당 디렉토리를 준비하지 않았다면, 다른 데이터를 이용해도 상관 없습니다. 다만 아래에서 실행할 explain() 으로 나오는 Plan 의 결과가 다를 수 있습니다.
이제 cache() 함수를 사용해보겠습니다. 동일한 데이터에 대해 cache() 를 호출하면
Spark Executor 내에 분산되어 DataFrame 이 저장됩니다.
storageLevel = MEMORY_AND_DISK 이므로 메모리가 부족할 경우 Disk 를 사용할 수 있습니다.
Spark UI 를 확인해보면 다음의 내용을 확인할 수 있습니다.
Executors 탭에서는 cache() 를 호출하기 이전에 비해 Storage Memory 가 늘었습니다.
Storage 탭에서는 197 개 만큼의 Cached Partitions 이 7.1 MiB 만큼 메모리에 존재합니다. (이 숫자는 파티션과 동일합니다)
SQL 탭에서 나오는 두개의 count() Action 을 살펴보면 두번째 count() 호출, 즉 dfCached.count() 에 InMemoryTableScan 이라는 단계가 추가된 것을 볼 수 있습니다.




두개의 스크린샷을 비교해 봅시다.
좌측의 Details for Query 0 은 cache() 를 호출하지 않은 df.count() 의 실행 계획입니다
우측의 Details for Query 1 은 cache() 를 호출한 dfCached() 의 실행 계획입니다
InMemoryTableScan 이라는 단계가 추가되었음을 알 수 있습니다
하단의 Details 버튼을 누르거나 explain("FORMATTED") 를 통해 살펴보면 텍스트로 된 실행 계획을 볼 수 있습니다. InMemoryTableScan 을 통해 데이터를 다시 Parquet 로 부터 읽지 않고 메모리에 캐싱된 영역에서 데이터를 읽어 집계합니다.
더이상 해당 데이터를 캐싱해서 사용하지 않는다면 unpersist() 함수를 통해 제거할 수 있습니다.
함수를 호출 후에 Spark UI 를 보면, Storage / Executors 탭에서 dfCached DataFrame 이 제거된 것을 볼 수 있습니다.
Cache 메모리 설정

Executors 탭을 다시 보면, Driver 의 Storage 메모리가 13.1 GiB 임을 알 수 있습니다. 사용자가 Spark Driver Memory 를 20g (GiB) 로 지정한것은 알겠는데 어떻게 Storage 메모리로 13.1 GiB 만큼 사용이 가능한 걸까요?
Spark 는 여러 옵션들로 메모리를 조정합니다. 추후에 Spark Memory 챕터에서 더 자세히 살펴 보겠지만 우선 필요한 부분만 알아보도록 하겠습니다.



Spark UI 의 Environment 탭을 살펴보면 다음처럼 메모리 설정을 볼 수 있습니다.
spark.memory.fraction 은 0.6 이 Default 지만, 현재는 0.75 가 세팅되어 있습니다
spark.memory.fraction 은 Spark 가 집계 (Execution) 및 저장 (Storage) 를 위해 사용할 수 있는 메모리 양을 지정합니다.
spark.memory.storageFraction 은 0.5 가 Default 지만, 현재는 0.2 가 세팅되어 있습니다.
spark.memory.storageFraction 은 spark.memory.fraction 영역 중에서 저장 (Stroage) 에 사용할 메모리 양을 지정합니다
다만 Spark 1.6+ 부터 Unified Memory Management 가 구현되었기에 (SPARK-10000) 엄밀하게 Execution / Storage 로 딱 잘리진 않습니다. 그림에서 화살표가 양방향으로 표기된 것도 위와 같은 이유입니다.
문서에 나와있는대로 Driver JVM Heap (20GiB) 에서 300 MiB 만큼을 제외하고 spark.memory.fraction 을 곱해보면 다음과 같이 수식이 나옵니다.
(전체 JVM 메모리 2048 - Reserved Memory 300) * spark.memory.fraction 0.75 = 1,311
약 13.1 GiB 입니다. 이게 바로 Spark UI 에서 보이는 Storage 13.1 GiB 의 값입니다.
Spark DataFrame 과 Cache
Spark DataFarme 의 cache() 함수를 사용하는 방법과 관련된 설정을 간단히 알아봤습니다.
cache() 는 위에서 소개한 것처럼 읽은 후 캐싱하기만 하는 용도가 아니라 Transformation 이 적용된 경우에도 당연히 사용이 가능합니다. DataFrame 이 제공하는 함수이기 때문입니다. 그러나 이 때는 조금 고려해야 할 부분이 있습니다. 예제를 통해 살펴봅시다.
count() 실행 후 Spark UI 의 SQL 탭에서 Plan 을 보면 아래와 같습니다.

dfTransformed.explain("FORMATTED") 로 Plan 을 보면
dfCached로 InMemoryRelation (4) 이 만들어지고dfTransformation으로 InMemoryRelation (2) 가 만들어집니다.
dfTransformation 을 이용해보면, InMemoryTableScan (1) 이 dfTransformation 즉, InMemoryRelation(2) 에서 필요한 Select 만 수행하는 것을 볼 수 있습니다. (InMemoryRelation 은 메모리에 캐싱되어 있는 DataFrame 을 지칭한다고 이해하셔도 당장은 무방합니다)
예제를 조금 변경해서 listing_id >= 30000000 인 경우에 대해 cache() 를 호출해보겠습니다. 그리고 변수를 저장하지 않고 사용해보겠습니다.
Plan 을 자세히 보면
dfCached.selectExpr("listing_id", "listing_name").where(col("listing_id") >= 30000000).cache()의 결과를DataFrame 으로 저장하지 않아도 캐싱이 (예정) 된 것을 볼 수 있습니다. 다만 Storage 탭을 보면 아직 저장된 RDD 가 없습니다. 이는 캐싱이 실제 수행되지 않은 상황인데 Action 을 호출하지 않았기 때문입니다.
다음 코드를 실행해보면 Stroage 탭에 DataFrame 이 등록되는 것을 볼 수 있습니다.

여기서 알 수 있는 사실은 cache() 를 호출하는것 만으로도 현재까지의 Transformation 이 적용된 DataFrame 이 캐싱된다는 것입니다. 그렇다면 아래의 코드를 실행하면 캐싱이 적용될까요?
Plan 을 보면 기존에 캐싱해 두었던 2 개의 DataFrame 이 전혀 사용되지 않은 것을 볼 수 있습니다.
얼핏 생각하면 listing_id >= 10000000 인 경우인 집합이 listing_id >= 30000000 인 집합을 포함하므로, 캐싱된 dfTransformed 을 충분히 이용할 것 같지만 실제로는 그렇지 않습니다.

Spark 실행 계획중 논리 실행 계획이 만들어지는 단계에서 Optimizer (최적화 단계) 보다 Cache Manager 가 더 먼저 동작하기 때문입니다. Spark 는 최적화된 Transformation 을 이용해 Cache 를 어떻게 사용할지를 결정하는것이 아니라 Cache 를 사용한 그 이후에 최적화 한다고 볼 수 있습니다.
따라서 캐싱을 사용하려면 cache() 를 호출한 결과 DataFrame 을 변수에 저장해 사용하는 편이 위와 같은 실수를 방지할 수 있습니다.
Spark SQL 과 Cache
DataFrame 의 createOrReplaceTempView() 를 이용하면 DataFrame 을 View 로 등록하고 spark.sql API 에서 테이블과 같이 FROM 구문에 사용할 수 있습니다.
이렇게 등록된 View 또한 DataFrame 이므로 캐싱이 가능합니다.
CACHE TABLE 을 이용하면, DataFrame.cache() 와는 다르게 즉시 캐싱됩니다. Spark UI 의 Storage 탭에서 확인할 수 있습니다. 복잡한 Transformation 식 대신 View 이름으로 (RAW_CACHED) 나오는것도 차이점입니다.

CACHE 'LAZY' TABLE 구문을 이용하면 즉시 캐싱하지 않고 Action 이 수행될때 스토리지에 DataFrame 의 데이터를 저장할 수 있습니다.
UNCACHE TABLE 구문을 이용하면 위에서 맏는 View 에 대한 캐싱을 제거할 수 있습니다. 만약 캐싱된 데이터 전체를 삭제하고 싶다면 CLEAR CACHE 구문을 이용하면 됩니다. CLEAR CACHE 구문은 DataFrame.cache() 로 등록된 내용도 같이 제거합니다.
Practice
실습 과제입니다.
Spark Memory Implementation 리서치
Unified Memory Management (SPARK-10000) PDF 를 읽어보며 Spark 1.6 이전과 이후에 Memory 영역이 어떻게 바뀌었는지 알아봅시다.
Storage 영역과 Execution 영역이 회수될때는 각각 어떤 제한점이 존재하는지도 이야기 해 봅시다.
Summary
아래는 이번 챕터에서 다룬 핵심 키워드입니다.
Storage Level
Execution Memory, Storage Memory
spark.memory.fraction
spark.memory.storageFraction
JVM: Heap, Off-heap
Table vs View
Last updated
Was this helpful?