2.2.1 Spark Memory
Last updated
Last updated
이번 챕터에서는 Spark 의 메모리 구조에 대해서 알아보겠습니다.
Spark 는 JVM 위에서 실행됩니다. PySpark 를 쓰는 경우에는 외부에 Python 프로세스가 존재할 수 있으나 Driver 또는 Executor 를 위한 JVM 이 실행되는건 동일합니다.
지난 챕터에서 Spark 모드에 따른 구분에서 알아보았듯이 Spark 는 다양한 실행 모드를 지원하고 각 실행 모드에 따라 컴포넌트의 실행 위치가 달라지게 됩니다.
Local 모드라면 단일 JVM 내에 Driver 와 Executor 가 존재합니다
Client 모드라면 Driver JVM 과 다수의 Executor JVM 이 존재합니다
Cluster 모드라면, JVM 기준으로 Client 모드와 동일하되 Job 을 Submit / Wait 하는 JVM 이 존재할 수 있습니다.
Spark 에서는 위 사진에 나온 다양한 컴포넌트들의 메모리를 조절할 수 있는 옵션을 제공합니다.
Configuration | Description |
---|---|
spark.driver.cores | Number of cores to use for the driver process (only in cluster mode) |
spark.driver.memory | Amount of memory to use for the driver process |
spark.driver.memoryOverhead | Amount of non-heap memory to be allocated per driver process in cluster mode This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. |
spark.executor.cores | The number of cores to use on each executors |
spark.executor.memory | Amount of memory to use per executor proces |
spark.executor.memoryOverhead | Amount of additional memory to be allocated per executor process This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. |
spark.executor.pyspark.memory | The amount of memory to be allocated to PySpark in each executor |
spark.memory.fraction | Fraction of (heap space - 300MB) used for execution and storage. The lower this is, the more frequently spills and cached data eviction occur |
spark.memory.storageFraction | Amount of storage memory immune to eviction, expressed as a fraction of the size of the region set aside by spark.memory.fraction. The higher this is, the less working memory may be available to execution and tasks may spill to disk more often. |
spark.memory.offHeap.enabled | If true, Spark will attempt to use off-heap memory for certain operations |
spark.memory.offHeap.size | The absolute amount of memory which can be used for off-heap allocatio |
spark.python.worker.memory | Amount of memory to use per python worker process during aggregation |
spark.kubernetes.driver.limit.cores | |
spark.kubernetes.executor.limits.cores | |
spark.kubernetes.driver.request.cores | |
spark.kubernetes.executor.request.cores | |
spark.kubernetes.memoryOverheadFactor |
아래 스크린샷은 Spark UI 에서 Spark Driver / Executor 의 리소스 설정값들을 확인하는 것을 보여줍니다. (https://spark.rstudio.com/guides/connections/)
spark.driver.memory 등 일부 Driver 옵션은 Client 모드의 경우 spark.driver.memory 를 세팅하는 것이 아니라 --driver-memory 커맨드라인 옵션을 통해 제공되어야 합니다.
Driver JVM 이 spark.driver.memory 를 읽는 시점에 이미 실행되었기 때문입니다 (JVM 을 위한 메모리가 실행 이전에 세팅이 되어야 함)
> Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. Instead, please set this through the --driver-memory command line option or in your default properties file. 또한 Local 모드의 경우 Executor 가 Driver JVM 내에 위치하므로 --driver-memory 옵션을 통해 세팅할 수 있습니다.
이런 모드별 설정값들을 올바르게 세팅하는 것은 여러분이 팀 / 사내에 Jupyter 등 Spark 를 위한 인프라를 제공할 때 매우 중요합니다.
spark.driver.cores, spark.executor.memory 와 같은 옵션은 개별 컴포넌트의 CPU 코어와 사용 가능한 메모리를 지정하기 때문에 설정의 이름만 보아도 직관적으로 알 수 있습니다. 그러나 다음의 설정값들은 이름으로 부터 이해가 바로 어려운데 Executor 를 기준으로 하나씩 살펴보겠습니다.
spark.memory.fraction
spark.memory.storageFraction
spark.executor.memoryOverhead
우선 Spark Executor 의 JVM Heap 메모리를 크게 다음과 같이 나눌 수 있습니다.
Spark Memory (spark.memory.fraction = 0.6, default)
Storage Memory (spark.memory.storageFraction = 0.5, default)
캐싱 (DataFrame.cache, CACHE TABLE) 또는 Broadcast, Driver 로 보내는 결과들이 이 영역의 메모리를 사용합니다.
Execution Memory (spark.memory.storageFraction 를 제외한 spark.memory.fraction)
데이터 집계 과정에서 Shuffle, Aggregation, Sort 등을 위해 사용합니다
User Memory (전체 JVM Heap 에서 spark.memory.fraction 와 Reserved Memory 를 제외한)
Spark 가 사용하는 내부 메타데이터, 사용자 생성 데이터 구조 저장이나 UDF 및 OOM 을 방지하기 위한 대비 (Safeguard) 영역으로 사용합니다.
Reserved Memory (300 Mib)
일반적으로 'Executor 메모리가 부족하다' 라고 말하면 Spark Memory 가 부족한 경우가 대부분입니다. 이 경우에는
Executor 가 사용하는 전체 JVM 메모리 사이즈를 늘리거나
spark.memory.fraction 값을 올릴 수 있습니다.
캐싱을 많이 사용한다면 Storage Memory 가 모자랄 수 있습니다. spark.memory.storageFraction 값을 늘릴수도 있겠지만, Spark 1.6 부터는 Unified Memory Management (SPARK-10000) 가 도입되면서 'Spark Memory 1.6+' 스크린샷의 초록색영역 (Spark Memory) 통합되었기 때문에 큰 효과가 없을 수 있습니다. 만약 메모리가 부족하다고 판단이 되면, 전체 메모리를 늘리는 편이 낫습니다.
Spark Memory 가 통합되면서
캐싱을 (Storage) 사용하지 않을 경우에는 Execution (집계) 를 위해 Stroage Memory 영역을 사용할 수가 있게 되었고
캐싱을 (Storage) 많이 사용한다면 Execution Memory 영역을 필요시 더 사용할 수 있게 되었습니다.
spark.memory.storageFraction 값은 이제 절대적인 Stroage Memory 양이 아니라, Evition 되지 않는 최대 메모리 양을 지정하는 옵션이 되었습니다
Spark 에서 캐싱에 대한 한 가지 오해는 "캐싱된 데이터는 메모리에 계속 존재한다" 입니다.
Spark 는 In-memory 컴퓨팅을 '지원' 하는 것이지 In-memory 만으로 집계를 수행하지 않습니다. 만약 데이터가 메모리에 들어갈 수 없다면 중간 집계 결과를 Disk 등에 남길 수 있습니다.
캐싱된 RDD Partition 또한 마찬가지입니다. SPARK-14289 (Multiple Eviction Strategies for Cached Partition)에서 볼 수 있듯이
Spark 는 신규 캐싱등을 위해 Storage Memory 가 부족할 경우 기존의 캐싱된 RDD Partition 을 LRU 등의 정책을 바탕으로 내보낼 (Eviction) 수 있습니다
축출된 (Evicted) RDD Partition 은 추후에 다시 필요하다면, Transformation 등을 실행 계획의 내용을 바탕으로 다시 계산을 수행해 만들거나, 아니면 Disk 에 저장된 내용을 바로 읽어 사용할 수 있습니다. 캐싱 함수의 옵션으로 제공되는 Storage Level 에 의해서 동작 방식을 조절할 수 있습니다.
> Likewise, cached datasets that do not fit in memory are either spilled to disk or recomputed on the fly when needed, as determined by the RDD's storage level.
MEMORY_AND_DISK_SER 는 언제 필요할까요? 캐싱된 RDD Partition 이 Eviction 될 때 Disk 에 쓰이는 경우를 가정하고 이야기 해 봅시다.
만약 사용하지 않음에도 unpersist() 함수를 호출하지 않는다면, Disk 사용량과 연산 속도가 어떻게 될까요?
> In Spark, execution and storage share a unified region (M). When no execution memory is used, storage can acquire all the available memory and vice versa. Execution may evict storage if necessary, but only until total storage memory usage falls under a certain threshold (R). In other words, R describes a subregion within M where cached blocks are never evicted. Storage may not evict execution due to complexities in implementation.
아래 그림은 Spark Memory 의 각 옵션이 숫자로 어떻게 적용되는지를 볼 수 있는 그림입니다.
Memory Overhead 및 Off-heap 옵션은 JVM 메모리 외의 영역에서 Spark 가 사용할 메모리를 지정합니다. Spark 는 버전에 따라 옵션이 많이 변화했습니다. Spark 3.0+ 를 기준으로 보면 JVM 외 영역에서
spark.executor.memoryOverhead (= executor.memory * 0.1, default)
PySpark 를 사용할 경우 Python Process 의 메모리 (spark.executor.pyspark.memory) 등 Non-JVM 메모리 영역을 지정합니다.
spark.memory.offHeap.size (= false, default)
String 을 저장하는 등 Java (JVM) 이 내부적으로 사용하는 용도 및 Spark 의 특정 기능을 위해 사용되기도 합니다
이제 JVM 및 Non-JVM 메모리 영역을 모아 Executor 메모리 전체를 한 눈에 살펴보겠습니다.
만약 Off-heap 메모리 관리 기능을 spark.memory.offHeap.enabled 옵션을 통해 활성화 한다면 Spark 에서는 Stroage Memory, Execution Memory 를 On-heap 이외에도 Off-heap 까지 활용하게 됩니다.
On-heap 을 사용할 경우와 Off-heap 을 사용할 경우의 차이점은 무엇일까요? 객체가 매우 많은 경우 Heap 내에서 GC 가 발생할 때의 문제점에 대해서도 이야기 해 봅시다.
[참고] Druid 와 같은 JVM 으로 실행되는 스토리지도 및 쿼리 엔진도 Off-heap 을 많이 활용하곤 합니다
Spark 내부의 Tungsten 이라 불리는 실행 엔진은 Off-heap 메모리 관리 기능을 제공합니다
GC 를 피하기 위해 Off-heap 기능을 이용할 수 있지만, 그렇지 않더라도 Spark 는 이미 내부적으로 충분히 GC 에 최적화된 방법으로 객체를 생성하고 관리하고 있습니다. (Spark Off-heap Memory Config And Tungsten )
Spark/Tungsten use Encoders/Decoders to represent JVM objects as a highly specialized Spark SQL Types objects which then can be serialized and operated on in a highly performant way. Internal format representation is highly efficient and friendly to GC memory utilization.
Thus, even operating in the default on-heap mode Tungsten alleviates the great overhead of JVM objects memory layout and the GC operating time. Tungsten in that mode does allocate objects on heap for its internal purposes and the allocation memory chunks might be huge but it happens much less frequently and survives GC generation transitions smoothly. This almost eliminates the need to consider moving this internal structure off-heap.
In our experiments with this mode on and off we did not see a considerable run time improvements. But what you get with off-heap mode on is that one need to carefully design for the memory allocation outside of you JVM process. This might impose some difficulties within container managers like YARN, Mesos etc when you will need to allow and plan for additional memory chunks besides your JVM process configuration.
Also in off-heap mode Tungsten uses sun.misc.Unsafe which might not be a desired or even possible in your deployment scenarios (with restrictive java security manager configuration for example).
PySpark 를 사용한다면 다음 두 가지의 메모리 옵션을 설정할 수 있습니다.
spark.python.worker.memory (512m, default) 는 JVM 내에서 Python Worker 의 집계를 위해 사용되는 영역입니다
spark.executor.pyspark.memory (설정되지 않음, default) 는 실제 Python Process 의 메모리입니다
spark.executor.pyspark.memory 는 기본값이 설정되어 있지 않으므로 PySpark 사용시 DataFrame 대신 일반 Python 객체와 함수를 이용해 가공하는 등 메모리를 많이 사용할 경우 메모리가 터질 수 있습니다.
AWS EMR 은 AWS 가 제공하는 관리형 빅데이터 클러스터입니다.
그림에서 볼 수 있듯이 EMR 을 이용하면 기존의 Hadoop 클러스터를 손쉽게 대체할 수 있습니다. AWS 에서는 EMR 과 연동된 수 많은 기능을 제공하기 때문에 단순히 대체하는 것을 넘어, 추가적인 기능을 사용할 수 있다는 장점이 있습니다.
EMR 클러스터 생성시 Zeppelin, JupyterHub 등의 시스템은 물론 Flink, Presto 등 다양한 빅데이터 인프라를 설치할 수 있습니다. 다만 용도에 맞추어 개별 클러스터를 생성하는 편이 낫습니다.
Yarn 을 이용해 Spark Application 를 실행할 경우에 Driver, Executor 의 Core 및 Memory 옵션은 이전 섹션에서 설명한것 과 동일합니다. 다만 EMR 과 Yarn 의 일부 옵션 관련해서 조금 살펴볼 필요가 있습니다.
Spark 를 Yarn Cluster 모드로 실행하게 될 경우 EMR 5 버전에서는 아래의 옵션으로 인해 EMR Core 노드에서 실행됩니다. (EMR 6 에서는 아래의 설정값들이 제거되었습니다. 따라서 필요할 경우 EMR Configuration 에 직접 추가해야 합니다)
EMR 에서 Core 노드는 HDFS 데몬을 실행하고 데이터를 유지합니다. 다만 AWS EMR 사용자 대다수가 HDFS 를 이용하진 않으므로 큰 의미는 없습니다.
Spark Log 등이 작업 종료 후 Aggregation 되어 로 HDFS 에 저장됩니다
각 노드 타입에 대해서는 Understanding EMR Node Types 문서에서 자세한 내용을 살펴볼 수 있습니다.
Yarn Application Master (AM), 즉 Spark 의 경우엔 Driver 는 위에서 언급한 Label 이 존재할때 Core 노드에서 동작합니다. 다만 EMR 에서는 Yarn 스케쥴러의 설정이 DefaultResourceCalculator 가 Default 이므로 DominantResourceCalculator 사용을 위해서는 직접 세팅해야 합니다 (capacity-scheduler.xml)
org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator 는 메모리 기반으로 스케쥴링을 하기 때문에 1개를 초과하는 vCPU 가 할당되지 않습니다.
org.apache.hadoop.yarn.util.resource.DominantResourceCalculator 는 CPU + 메모리를 고려해 스케쥴링 하기 때문에 사용자가 요청한 수의 vCPU 를 할당합니다
또한 자동으로 세팅되는 vCPU Max 값 이상의 vCPU 를 할당하고 싶을 경우 yarn.scheduler.maximum-allocation-vcores 옵션을 수정해야 할 필요가 있습니다.
EMR 에서는 인스턴스 사이즈를 고려해 yarn.scheduler.maximum-allocation-vcores 를 할당하나, Core (Spark Driver) / Task (Spark Executor) 노드의 EC2 타입이 매우 다를경우 작게 설정될 수 있기 때문입니다.
일반적으로는 다른 EC2 타입을 사용하더라도 (m5, c5, r5) 등 CPU 와 리소스의 배수 비율을 어느정도 맞추는 편이 낫습니다.
yarn.nodemanager.resource.memory-mb, yarn.scheduler.minimum-allocation-mb, yarn.scheduler.maximum-allocation-mb 는 메모리 관련 설정입니다
Task 인스턴스 타입별 Yarn allocation-mb 사이즈 문서에서 각 인스턴스별로 사이즈가 어떻게 세팅이 되는지 살펴봅시다.
만약 인스턴스가 제공하는 메모리 양과 yarn.nodemanager.resource.memory-mb 이 다르다면 왜 그런지 이유를 생각해 봅시다.
아래 코드는 EMR 생성시 위에서 언급한 옵션들을 수정할 수 있도록 한 설정 코드입니다.
EMR 에는 Core / Task 란 개념이 있지만, Yarn 은 Core / Task 를 분리해서 생각하지 않고 리소스를 관리합니다. 따라서 yarn.scheduler.capacity.maximum-am-resource-percent 옵션이 0.5 로 잡혀 있어, Core 노드에 리소스가 있음에도 Driver 에 리소스 할당이 되지 않을 수 있습니다.
yarn.scheduler.capacity.maximum-am-resource-percent 옵션은 전체 리소스 중 Application Master (Spark Driver) 가 사용할 수 있는 리소스의 비율을 결정합니다.
EMR 의 yarn.scheduler.capacity.maximum-am-resource-percent 기본값은 0.5 입니다.
[실습] EMR 클러스터를 생성해보고, 위에서 언급한 옵션이 어디에 기술되어 있는지 살펴봅시다.
EMR 사용시 maximizeResourceAllocation 옵션을 이용하면 사용자가 Driver, Executor 의 리소스를 직접 세팅하는 것이 아니라 AWS 가 Core / Task 인스턴스의 사이즈를 고려해 할당합니다.
그러나 Long-running 클러스터는 리소스를 분할해 사용하고, 사용자가 세밀히 리소스를 컨트롤할 필요가 있으므로 이 옵션을 사용하지 않는 편입니다.
만약 EMR 클러스터를 1회성으로 실행하고 폐기해야 한다면, 이 옵션이 유용할 경우가 있습니다. 다만 기본으로 설정되는 Paralleism 값이 작은 편입니다.
Spark 3.1+ 부터 Kubernetes 를 Cluster Manager 로 사용할 수 있습니다. (GA 버전 기준) Spark 를 Kubernetes 에서 사용할 경우 EMR 이 제공하는 몇몇 특화 기능들은 사용할 수 없지만 (EMRFS S3-Optimized Commiter, EMR Decomission, EMR Autoscaling 등), 그럼에도 몇 가지 이점들이 있습니다.
위 그림에서 좌측은 EMR 을 사용하는 경우입니다. EMR 은 클러스터 하나당 단일 Spark 버전만 지원하기 때문에 여러 Spark 버전을 사용해야 한다면 Spark 관리가 어렵습니다. 또한 Kubernetes 가 제공하는 Cluster Autoscaler 등 다양한 기능들을 이용할 수 있습니다.
Spark 를 Kubernetes 에서 실행할 경우 아래의 옵션들로 Core 및 Memory 옵션을 지정할 수 있습니다. Memory 의 경우에는 기존과 동일하게 spark.driver.memory, spark.executor.memory 옵션으로 설정이 가능합니다.
이 중에서 memoryOverheadFactor 라는 부분이 기존의 Yarn 에서 Spark 를 할당할때와는 조금 다른 옵션입니다. (Yarn 도 overhead 만큼의 메모리를 추가적으로 할당하나 factor 가 아니라 메모리 사이즈를 고정값으로 받았습니다)
spark.kubernetes.memoryOverheadFactor (= 0.1, default) 옵션으로 Non-NVM 영역의 메모리를 Kubernetes Pod 에 추가할 수 있습니다.
예를 들어 spark.executor.memory = 120g 및 spark.kubernetes.memoryOverheadFactor = 0.1 인 경우에 Pod 은 10% 만큼의 Overhead 추가해 132g (GiB) 가 할당됩니다
많은 경우에 Scala 를 이용해 Spark Batch / Stream Application 을 작성합니다. 다만 사용자의 편의나 특정 Python 라이브러리 사용등을 이유로 Python 을 사용할 수 있는데, 이번 섹션에서는 PySpark 의 구조에 대해 간단히 알아봅니다.
PySpark 를 사용하면 Scala Spark 를 사용할때와는 다르게 Python 프로세스가 존재합니다.
Python Driver Process 는 Py4j 를 이용해서 별도 JVM 프로세스에 Spark Context 를 생성합니다
PySpark 에서도 spark.sparkContext 객체가 존재하지만, 이것은 명령을 내리기 위한 객체이며 실제로는 명령을 받은 JVM 내의 SparkContext 가 필요한 작업을 수행합니다
같은 노드 내에 있더라도 Python Process 와 JVM Process 는 서로 다른 프로세스이므로 데이터 (메모리) 를 공유할 수 없습니다. IPC (Inter-process Communication) 간 통신을 위해 Socket 을 이용합니다. (Executor 는 Pipe 를 사용합니다)
Scala Spark 를 이용할 경우 필요 없을 Socket 통신을 이용해 데이터를 주고 받으므로 PySpark 는 느린 경우가 많습니다.
예를 들어 PySpark 로 큰 사이즈의 S3 Parquet 파일을 읽어 Pandas 로 가공하고 싶다면 다음과 같은 프로세스를 거칩니다
Executor 가 데이터를 읽어 Driver JVM 로 전송합니다. Driver JVM 은 다시 Driver Python 으로 보내기 위해 데이터를 Temp File 등에 기록하고 이것을 Driver Python 이 읽습니다
이 과정에서 네트워크를 통해 데이터를 넘기고, Disk 에 파일을 쓰며 Serialization / Deserialization 이 발생합니다
Arrow 와 같은 공통화된 메모리 직렬 포맷을 이용한다면 Serialization / Deserialization 을 효율적으로 수행할 수 있습니다. PySpark 의 toPandas 는 Arrow 가 활성화 되어 있을 경우 이를 이용하도록 구현되었습니다.
spark.sql.execution.arrow.pyspark.enable 옵션을 통해 활성화가 가능합니
만약 Python UDF 와 같은 Python 코드를 Executor 가 실행해야 한다면 어떨까요?
Python 코드를 실행하기 위해 Executor 내의 Partition 들이 Executor 내 Python Process 로 Serialization / Deserialization 되어야 합니다.
아래의 두 그림을 통해 JVM (Scala / Java) UDF 일 경우와 Python UDF 일 경우를 비교할 수 있습니다
실습 과제입니다.
다음과 같은 오류 메세지를 발견했을 때 어떤 옵션을 조절하면 좋을지 고민해 봅시다. 조건은 다음과 같습니다.
이 때, 아래와 같은 오류 메세지가 Parquet Write 를 하는 과정에서 발생했습니다. (노트북에서 테스트해봐도 Join, Aggregation 등은 문제가 없었습니다.)
ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 33.2 GB of 33 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.
메모리 관련된 설정은 어떻게 잡혀있을까 싶어 Spark UI 에서 Environment 탭을 확인해 보았습니다.
Spark Version 은 2.4.8 을 사용하고 있습니다.
다음은 메모리 옵션을 수정하기 전 생각해볼 만한 몇 가지 가설입니다.
Join, Aggregation 등에서 Memory 가 터졌다면 Heap OOM 메세지가 발생했을 것이다.
일반적인 경우엔 Memory 가 넘친다면 Disk Spill 을 이용해 속도는 느리겠지만 집계할 수 있다.
Write 하는 과정에서 터졌다면, 특정 Execurtor 내 파티션에 데이터가 몰린걸까? (Skew) 그렇다면 어떻게 해결할 수 있을까? 데이터 내에는 사용자 ID 및 사용자가 발생시킨 이벤트 타입과 시간 값이 있다.
이제는 아래와 같은 메세지가 발생합니다. 왜 그런지 고민해 봅시다.
spark.executor.memory 를 줄이면, 메모리 영역중 Execution 메모리 (Group by, Window 등 Aggregation 을 위한) 가 줄어듭니다. 어떤 영향이 있을까요?
만약 메모리는 넉넉한데 Group By, Window Function 등에서 Skew 가 발생한다면 어떤 옵션을 수정하는 것이 좋을까요?
1) spark.sql.shuffle.partitions
숫자를 늘려 Skew 확률을 낮춘다.
2) Group By, Window Function 전 해당 집계에서 사용하는 Key 를 기준으로 repartition(X, "key") 를 진행한다.
Heap OOM 이 발생할 경우에는 다음과 같은 메세지가 남습니다.
아래는 이번 시간에 다룬 핵심 키워드들 입니다.
Arrow
UDF
Yarn Scheduler: DefaultResourceCalculator, DominantResourceCalculator, FairScheduler
Garbage Collection (GC)
Storage Memory / Execution Memory