하단의 Details 버튼을 누르거나 explain("FORMATTED") 를 통해 살펴보면 텍스트로 된 실행 계획을 볼 수 있습니다. InMemoryTableScan 을 통해 데이터를 다시 Parquet 로 부터 읽지 않고 메모리에 캐싱된 영역에서 데이터를 읽어 집계합니다.
더이상 해당 데이터를 캐싱해서 사용하지 않는다면 unpersist() 함수를 통해 제거할 수 있습니다.
dfCached.unpersist()
함수를 호출 후에 Spark UI 를 보면, Storage / Executors 탭에서 dfCached DataFrame 이 제거된 것을 볼 수 있습니다.
하나의 배치 작업이 끝나면 JVM (Executor) 도 제거되므로 DataFrame 을 cache() 한 것도 사라지기 마련입니다. 그런데 왜 unpersist() 함수가 필요할까요?
하나의 배치 작업에서 사용 가능한 메모리가 작은경우를 가정하여 논의해 봅시다.
Cache 메모리 설정
Executors 탭을 다시 보면, Driver 의 Storage 메모리가 13.1 GiB 임을 알 수 있습니다. 사용자가 Spark Driver Memory 를 20g (GiB) 로 지정한것은 알겠는데 어떻게 Storage 메모리로 13.1 GiB 만큼 사용이 가능한 걸까요?
Spark 는 여러 옵션들로 메모리를 조정합니다. 추후에 Spark Memory 챕터에서 더 자세히 살펴 보겠지만 우선 필요한 부분만 알아보도록 하겠습니다.
Memory usage in Spark largely falls under one of two categories: execution and storage. Execution memory refers to that used for computation in shuffles, joins, sorts and aggregations, while storage memory refers to that used for caching and propagating internal data across the cluster. In Spark, execution and storage share a unified region (M). When no execution memory is used, storage can acquire all the available memory and vice versa. Execution may evict storage if necessary, but only until total storage memory usage falls under a certain threshold (R). In other words, R describes a subregion within M where cached blocks are never evicted. Storage may not evict execution due to complexities in implementation.
약 13.1 GiB 입니다. 이게 바로 Spark UI 에서 보이는 Storage 13.1 GiB 의 값입니다.
Spark 의 spark.memory.fraction 값에 대해서 생각해 봅시다
spark.memory.fraction 값이 너무 높으면 무슨 일이 발생할까요?
spark.memory.fraction 값이 너무 낮으면 무슨 일이 발생할까요?
Spark DataFrame 과 Cache
Spark DataFarme 의 cache() 함수를 사용하는 방법과 관련된 설정을 간단히 알아봤습니다.
cache() 는 위에서 소개한 것처럼 읽은 후 캐싱하기만 하는 용도가 아니라 Transformation 이 적용된 경우에도 당연히 사용이 가능합니다. DataFrame 이 제공하는 함수이기 때문입니다. 그러나 이 때는 조금 고려해야 할 부분이 있습니다. 예제를 통해 살펴봅시다.
dfTransformation 으로 InMemoryRelation (2) 가 만들어집니다.
dfTransformation 을 이용해보면, InMemoryTableScan (1) 이 dfTransformation 즉, InMemoryRelation(2) 에서 필요한 Select 만 수행하는 것을 볼 수 있습니다. (InMemoryRelation 은 메모리에 캐싱되어 있는 DataFrame 을 지칭한다고 이해하셔도 당장은 무방합니다)
얼핏 생각하면 listing_id >= 10000000 인 경우인 집합이 listing_id >= 30000000 인 집합을 포함하므로, 캐싱된 dfTransformed 을 충분히 이용할 것 같지만 실제로는 그렇지 않습니다.
Spark 실행 계획중 논리 실행 계획이 만들어지는 단계에서 Optimizer (최적화 단계) 보다 Cache Manager 가 더 먼저 동작하기 때문입니다. Spark 는 최적화된 Transformation 을 이용해 Cache 를 어떻게 사용할지를 결정하는것이 아니라 Cache 를 사용한 그 이후에 최적화 한다고 볼 수 있습니다.
따라서 캐싱을 사용하려면 cache() 를 호출한 결과 DataFrame 을 변수에 저장해 사용하는 편이 위와 같은 실수를 방지할 수 있습니다.
Spark SQL 과 Cache
DataFrame 의 createOrReplaceTempView() 를 이용하면 DataFrame 을 View 로 등록하고 spark.sql API 에서 테이블과 같이 FROM 구문에 사용할 수 있습니다.
이렇게 등록된 View 또한 DataFrame 이므로 캐싱이 가능합니다.
df.createOrReplaceTempView("RAW")spark.sql("SELECT * FROM RAW LIMIT 10").show()
+----------+--------------------+--------------------+--------------------+--------------------+
|listing_id| listing_url| listing_name| listing_summary| listing_desc|
+----------+--------------------+--------------------+--------------------+--------------------+
| 12276698|https://www.airbn...|Downtown Casa in ...|Built in (Phone n...|Built in (Phone n...|
| 39589825|https://www.airbn...|Comfy Stapleton c...| null| null|
| 16676955|https://www.airbn...|Adorable Row Home...|Updated Spanish s...|Updated Spanish s...|
| 38638676|https://www.airbn...|Amenity Rich LUX ...| null| null|
| 33396764|https://www.airbn...|NE Dnvr Home 3br/...| null| null|
| 9842499|https://www.airbn...|Lg Light Bsmnt in...|Dbl BR and den w/...|Dbl BR and den w/...|
| 39150452|https://www.airbn...|Modern Chic Apart...| null| null|
| 39175439|https://www.airbn...|The Highlands Ret...|I live in a very ...|I live in a very ...|
| 35226322|https://www.airbn...|Mountain Views in...|Located in RiNO, ...|Located in RiNO, ...|
| 32902010|https://www.airbn...|Bright Cozy Room ...| null| null|
+----------+--------------------+--------------------+--------------------+--------------------+
CACHE TABLE 을 이용하면, DataFrame.cache() 와는 다르게 즉시 캐싱됩니다. Spark UI 의 Storage 탭에서 확인할 수 있습니다. 복잡한 Transformation 식 대신 View 이름으로 (RAW_CACHED) 나오는것도 차이점입니다.
spark.sql("""CACHE TABLE RAW_CACHED AS SELECT * FROM RAW""")
CACHE 'LAZY' TABLE 구문을 이용하면 즉시 캐싱하지 않고 Action 이 수행될때 스토리지에 DataFrame 의 데이터를 저장할 수 있습니다.
spark.sql("""CACHE LAZY TABLE RAW_CACHED_LAZY AS SELECT * FROM RAW WHERE listing_id >= 20000000""")spark.sql("SELECT * FROM RAW_CACHED_LAZY").explain("FORMATTED")
UNCACHE TABLE 구문을 이용하면 위에서 맏는 View 에 대한 캐싱을 제거할 수 있습니다. 만약 캐싱된 데이터 전체를 삭제하고 싶다면 CLEAR CACHE 구문을 이용하면 됩니다. CLEAR CACHE 구문은 DataFrame.cache() 로 등록된 내용도 같이 제거합니다.
spark.sql("""UNCACHE TABLE IF EXISTS RAW_CACHED""")spark.sql("""CLEAR CACHE""")# 뷰 제거spark.catalog.dropTempView("RAW")
캐싱에는 그 사이즈에 맞는 메모리가 필요합니다. 일반적으로 어떤 경우에 캐싱을 사용할까요? 다음 두 가지 경우를 고려해봅시다.
데이터가 크고 Cadinality 가 높은 사용자 이벤트 (ecommerce_event.csv)
데이터가 상대적으로 작고 메타 데이터인 Property 정보 테이블 (airbnb_listings.csv)