2.1.6 Spark Persistence

지난 챕터에서는 Spark 는 다양한 형태의 파일을 읽어 DataFrame, 즉 테이블 형태의 논리적인 추상을 제공한다는 것을 익혔습니다.

  • 사용자는 수 많은 파일들을 묶어, 하나의 테이블처럼 사용할수 있고

  • 대규모 처리가 필요할 경우 원하는 수준으로 Partition 을 분할해 복수개의 Executor 에서 나누어 처리할 수 있습니다

  • CSV 뿐만 아니라 JSON, Parquet, JDBC 등에서 데이터를 읽어 여러 DataFrame 을 만든뒤 Join 을 수행해 복합적인 데이터 가공 및 분석도 가능합니다

RDS / Aurora (MySQL 등) 에는 Snapshot Export 라는 기능이 있습니다. 이 기능을 이용하면 DB 의 특정 시점의 테이블들을 S3 에 Parquet 포맷으로 저장이 가능합니다.

서비스에 즉시 반영되어야 하는 Transaction 과는 상관없는 후처리 (분석 혹은 운영성 배치작업) 할 경우 S3 (또는 GCS 등) 에 저장되어 있는 Parquet 포맷을 읽어 데이터 가공을 진행한다면 RDB 에 부하를 주지 않으면서 저렴한 비용으로 (S3) 작업을 할 수 있습니다.

또한 RDB 에 있는 데이터가 뿐만 아니라 비즈니스 담당자가 구글 시트 등에서 관리하는 데이터도 CSV 로 만들어 S3 등에 올려둔다면 Join 후 가공하는 것도 가능합니다.

Spark 가 빌트인으로 읽을 수 있는 파일 포맷은 Spark SQL Guide - Generic Load / Save 문서에서 확인할 수 있습니다.

이번 챕터에서는 위와 같이 Spark DataFrame 을 이용해 데이터를 메모리에 올려 가공하기 위해 다양한 포맷의 데이터를 읽고 저장하는 방법에 대해 배워보겠습니다.

이번시간에 사용할 데이터셋은 Kaggle - Denver AirBNB 입니다. 파일을 다운받은 후 다음과 같이 이름을 변경합니다.

CSV & JSON

CSV 는 기존 챕터에서 사용하던 방식으로 읽을 수 있습니다. 다만 이번에는 데이터 내에 문자열 컬럼 (listing_description 등) 이 있고 여러 라인이 될 수 있기 때문에 옵션을 조금 조정합니다.

옵션을 살펴보면 다음 옵션이 추가되었음을 알 수 있습니다.

  • quote

  • escape

  • multiline

listing 은 AirBNB 의 상품인 숙소 (Property) 입니다. listing 마다 id 값이 부여되며, 이 listing_id 값을 이용해 아래에서 airbnb_calendar.csv 내의 숙소마다의 일별 가격과 같이 데이터를 보도록 하겠습니다.

JSON 파일을 읽는 방법도 동일합니다. 다만 현재는 JSON 파일이 없으므로, 위에서 읽은 airbnb_listings.csv 를 JSON 으로 Write 해본 후 다시 읽어보겠습니다.

listing 의 id, url, name, summary, description 만 SELECT 후 JSON 으로 Write 를 해보면

Repartition 값을 2개로 지정해 2개의 파일이 생성된것을 볼 수 있습니다. 위의 예시에서는 파일이 너무 작아 사실 나눌 필요는 없으나, 예시를 위에서 Partition 값을 1보다 크게 만들어 파일을 나누었습니다.

  • 파일의 경우 DataFrame.save 전에 repartition 된 숫자 만큼 파일이 생성됩니다.

  • 개별 파일 사이즈가 너무 크다면 repartition 을 통해 파일 숫자를 늘리면, 파일 사이즈가 줄어듭니다.

  • 개별 파일 사이즈가 너무 작다면 repartition 을 통해 파일 숫자를 줄이면, 파일 사이즈가 커집니다.

  • 파일 사이즈가 너무 크다면 파일 하나를 읽는데도 메모리가 부족할 수 있습니다.

  • 파일 숫자가 너무 많다면 전체 파일을 리스팅 하는 등 추가적인 부담이 들 수 있습니다.

Write / Read 시에 여러 옵션을 줄 수 있습니다. 파일 포맷 마다 지원되는 옵션은 다르며, 위의 코드에서는 예시를 위해 JSON Write 시에 gzip 압축 옵션을 넣었습니다.

Write 시에는 mode 옵션에 주의해야 합니다.

  • "append" 는 기존 데이터에 덧붙입니다. File 의 경우에는 추가 파일이 만들어지므로, 의도하지 않았을 경우에는 중복이 발생합니다.

  • "overwrite": 는 기존 데이터를 삭제하고 새로 적재합니다. 데이터에 문제가 있거나 등의 이유로 재적재 하는 경우에만 이 옵션을 사용합니다. Default 로 지정해 놓을 경우 실수로 기존 데이터를 삭제할 수 있으니 주의해야 합니다.

  • "error": 데이터가 이미 존재할 경우 Error (Exception) 이 발생합이다. 대부분의 경우 기본 설정으로 이 값을 사용하는 편이 낫습니다.

Write 모드 중 Update / Upsert 는 존재하지 않습니다. 데이터를 업데이트하기 위해

  • Iceberg (rewrite)

  • Apache Hud (upsert)i / Delta Lake / Hive ACID 등을 사용할 수 있습니다.

이제 만들어진 데이터를 읽어보겠습니다. spark.read.format("json") 을 통해 읽는 것이 가능합니다.

데이터의 숫자도 Write 한 것과 동일하고, 컬럼이름도 올바른 것을 확인할 수 있습니다.

Spark 는 JSON 파일을 읽으면서 각 컬럼의 타입을 추론합니다. 컬럼 타입을 정하려면 각 컬럼별로 전체 값을 읽어야 합니다. 따라서 파일이 매우 크고 이미 타입을 알고 있다면 추론을 Spark 가 하는 대신 primitivesAsString 값을 이용해 String 으로 값을 세팅하고 cast() 등의 함수를 이용해 직접 타입을 지정하면 추론에 들어가는 시간을 줄일 수 있습니다.

CSV Write 또한 같은 방법으로 가능합니다. 이번에는 repartition(1) 을 지정하면 파일이 1개만 생성되고 압축을 하지 않았으므로 이전보다 사이즈가 커진것도 볼 수 있습니다 (1.9M -> 6.6M)

일반적으로는 데이터를 로컬에 파일로 저장하지 않고 AWS S3, GCP GCS, HDFS 등에 저장합니다.이 때 시스템의 호환성 / 운영 용이성 등을 위해 데이터를 일별 /시간별로 나누어 다음과 같은 경로처럼 저장합니다.

  • s3://udon-data-prod/db/udon-service/udon-order/2021/11/01

  • s3://udon-data-prod/db/udon-service/udon-order/2021/11/02

  • s3://udon-data-prod/log/udon-service/udon-client/2021/11/01/00

  • s3://udon-data-prod/log/udon-service/udon-client/2021/11/01/01

1일치 데이터만 처리한다면 2021/11/01 경로의 데이터를 읽을 수 있습니다. 만약 2일치 데이터를 Spark 를 이용해 각 경로의 데이터를 범위로 있겠지만, 일별로 데이터를 읽는게 편하진 않습니다.

테이블처럼 데이터를 쓸 수 있게 해주는 메타스토어는 다른 챕터에서 다시 논의해 보겠습니다.

Parquet - Columnar Format

데이터를 빠르게 가공하기 위해선 CPU, Memory 와 같은 리소스와 Spark 같은 분산 처리 프레임워크도 중요하지만 데이터가 어디에 어떻게 저장되어 있는지도 굉장히 중요한 요소입니다.

이 섹션에서는 Parquet, ORC 와 같은 Columnar Format 에 대해서 알아보겠습니다.

왜 Parquet, ORC 같은 Columnar Format 이 필요할까요? 2013년에 발표된 Parquet: Columnar Storage for The People 슬라이드에서는 Columnar Format 의 장점을 크게 4가지로 이야기 하고 있습니다.

  • (Column Pruning) 대부분의 경우 데이터는 몇몇 Column 단위로 이용되므로 (SELECT b, d) Column 기준으로 데이터를 읽을 수 있다면 IO 등 물리 비용을 줄일 수 있습니다.

  • (Column Encoding) Column 중심으로 데이터를 저장할 경우 타입에 특화된 인코딩을 통해 저장 공간을 줄일 수 있습니다

    • 예를 들어 udon_department 란 컬럼이 있고 값으로 development / product_owner 와 같은 ENUM 값이 있다면 development = 1 로 저장하는 것이 가능합니다. (Parquet Encoding)

  • (Predicate PushDown) 데이터를 Spark 로 가져오기 전 미리 필터링이 가능해 네트워크 비용을 포함한 데이터 가공 비용을 크게 줄일 수 있습니다.

  • 마지막으로 컴퓨팅 프레임워크 구현에 따라 벡터화된 연산이 가능합니다.

Parquet File Format Overview (http://peter-hoffmann.com/2020/understand-predicate-pushdown-on-rowgroup-level-in-parquet-with-pyarrow-and-python.html)

Parquet 파일 포맷으로 데이터를 저장하면, 내부적으로는 Row 를 묶어 Row Group 을 만들어 저장합니다. Row Group 내의 각 컬럼별로 통계치를 가질 수 있습니다.

  • Column Pruning (혹은 Projection Pushdown) 을 이용해 필요한 컬럼 값만 뽑아내고

  • Predicate Pushdown 을 이용해 Row 그룹 내 값을 미리 필터링 할 수 있습니다.

    • WHERE A < 2 라는 조건이 있다면, Row Group 2 번은 읽지 않을 수 있습니다.

Column Pruning + Predicate Pushdown Overview (https://dkharazi.github.io/blog/parquet)
Parquet Format Overview (https://dkharazi.github.io/blog/parquet)

하나의 Row Group 내에는 Column Chunk 가 여러개 존재하며, Column Chunk 는 다시 여러개의 Page 로 구성됩니다. Parquet 는 파일의 Footer Metadata 내에 Column 의 통계 정보를 가지고 있습니다.

Spark 에서 Parquet 사용은 CSV / JSON 과 동일합니다.

우선 Parquet 파일을 만들어 봅시다. DataFrame 을 Write 해보면

압축을 Snappy 로 한 것 이외에는 CSV, JSON 과 사용법이 동일합니다. 이제 만들어진 Parquet 파일을 읽어서 DataFrame 으로 만들어 보겠습니다.

파티션 값이 1임을 알 수 있습니다. spark.read 로 파일을 읽을 경우에는 파일 사이즈나 숫자에 의해 동적으로 Partition 숫자가 spark.default.parallelism (default = 200) 값에 따라 조절됩니다.

만약 파일 숫자가 늘어난다면 Partition 숫자도 늘어나지 않을까요?

만약 파일 숫자와 파티션 숫자가 다르다면 왜 다른걸까요?

Spark 에서 DataFrame 으로 파일을 읽을 때 적용되는 옵션을 다음 문서에서 찾아봅시다.

Parquet (또는 ORC 등) 와 같이 다양한 언어 (Java, Python) 및 프레임워크 (Spark, Dask) 를 지원하는 경우에는 호환성을 테스트 해보는 편이 좋습니다.

  • 예를 들어 Parquet 로 데이터를 저장한다고 가정하면

  • Scala Spark 에서 저장한 파일을 Hive, Presto, Dask 등에서 읽을 수 있는지

  • 거꾸로 Python Dask 에서 저장한 파일을 반대로 Hive, Presto, Spark 등에서 읽을 수 있는지

  • 사용하는 타입이 언어 및 프레임워크 별로 전부 호환이 되는지

사내에서 사용하는 도구에 대해 다양하게 테스트를 해보는 편이 낫습니다. Parquet 스펙이 지원한다 하더라도, 언어나 실제 구현체에서 지원하지 않을 수 있기 때문입니다.이런 이유로 타입의 경우에는 데이터 사이즈에서 미미한 손해를 보더라도, 호환성을 위해 큰 타입을 사용하는 편이 낫습니다. (INT32 -> Long 등)

타입을 잘못 맞추면 고통을 겪을 수 있습니다. 예를 들어, Spark 2.4 이하와 Hive, Impala 등에서는 Parquet 파일 저장시 Timestamp 타입을 INT96 (12 bytes) 으로 지정합니다. 그러나 Parquet 에서는 INT96 은 공식적으로 지원하진 않습니다.

따라서 Spark 에서는 Parquet 의 INT96 을 읽을 때 Timestamp 타입으로 자동으로 전환하며 spark.sql.parquet.int96AsTimestamp 옵션을 통해 변환될 타입을 지정할 수 있습니다.

Spark 3.0 이상에서는 spark.sql.parquet.outputTimestampType 를 통해 Write 시 Timestamp 값을 위한 Parquet 타입값을 지정할 수 있으며 기본으로는 TIMESTAMP_MICROS (Parquet 의 INT64 타입) 형식으로 저장합니다.

Avro - Data Serialization

Apache Avro 는 데이터 직렬화 도구입니다. Java, Pyton 은 물론 Ruby, C, C# 등 다양한 언어에서 활용할 수 있으며 주로 Kafka Schema Registry 에서 많이 활용되곤 합니다.

JSON 을 전송하고 받아서 사용하는것에 비해 Avro, Protobuf, Thrift 와 같은 언어 중립적인 데이터 직렬화 시스템을 사용하는데는 몇 가지 이유가 있습니다.

  • 언어 중립적이므로 어떤 언어를 사용하는지 상관없이 직렬화 도구가 지원한다면 다양한 언어로 데이터를 읽고 쓸 수 있습니다. (e.g, Kafka Producer & Consumer)

  • 스키마를 미리 정의하고 사용하므로 실수로 변경될 부분을 방지하고 미래의 변경도 기존 소비자 (Consumer 등) 를 위한 스키마 호환을 (Avro Schema Evoluation) 지원합니다.

  • 타입이 정해져 있으므로 Encoding 을 통한 사이즈 축소 및 효율적인 조회 (Columnar 의 경우 Colume Pruning, Predicate Pushdown 등) 이 가능합니다.

  • Thrift, ProtoBuf (gRPC) 와 같은 프레임워크는 RPC API 를 제공하기도 합니다.

많은 경우에 다음과 같이 사용합니다.

  • S3, HDFS 등에 저장된 대규모의 / 읽기를 위한 데이터는 Parquet, ORC 등 읽기 및 압축 효율이 좋은 Columnar 포맷을 사용합니다. 읽기가 빈번하고 사이즈가 매우 커질 수 있기 때문입니다.

  • Kafka / API 등 실시간으로 데이터를 주고 받는 경우에는 Avro 처럼 Schema 관리가 편리하거나 (Registry 등 도구 제공) Protobuf w/ gRPC 처럼 통신을 위한 라이브러리가 지원되는 포맷을 활용하는 경우가 많습니다.

물론 장점만 있지는 않습니다. 멀쩡한 데이터를 Header (Schema / Meta) + Payload (Data) 로 변경하고 인코딩을 수행해야 하므로 CPU 리소스를 태웁니다. 다만 사이즈는 작아질 수 있기에 네트워크 비용에서 이점을 볼 수 있습니다.

Avro 를 사용하기 위해서는

  • Data Serialization 포맷은 IDL (Interface Definition Language) 이라 불리는 스키마 파일을 작성합니다

  • Protobuf / Thrift / Avro 의 경우에는 IDL 을 바탕으로 코드를 생성할 수 있습니다.

아래는 Protobuf 의 IDL 샘플 입니다. Avro 도 비슷하게 스키마를 작성할 수 있습니다. 다만 지원되는 문법과 타입이 다를 수 있습니다.

Avro 의 경우에는 다른 프레임워크와 다르게, Data 자체에 Schema 를 포함합니다.

  • 따라서 코드 생성을 통해 빌드 타임에 데이터 클래스를 생성할 수 있지만 (Avro SpecificRecord)

  • 런타임에도 Deserialization 을 수행할 수 있습니다. 다만 런타임에 예외가 발생할 수 있습니다. (Avro GenericRecord)

  • 피치 못한 경우가 아니라면 빌드 타임에 데이터 클래스를 생성해 사용하는 편이 낫습니다.

Avro 는 주로 Kafka 와 함께 활용됩니다.

  • Avro 로 Serialized 된 데이터를 Kafka Broker 로 직접 보내고 읽을 수 있습니다.

  • 만약 Schema Registry 를 이용하면 업타임에 Producer / Consumer 에서 스키마를 등록하거나 받아올 수 있습니다.

  • Confluent Schema Registry 의 경우 Avro 뿐만 아니라 Protobuf, JSON 형식의 스키마도 지원합니다.

Confluent Kafka Schema Registry Overview (https://docs.confluent.io/platform/current/schema-registry/index.html)

Avro 스키마는 일반적으로 Schema Registry 또는 .avsc (스키마 레지스트리 사용하지 않을 경우) 를 통해 포맷을 정한 후 Read / Write 를 수행합니다. 아래의 코드에서는, 예시를 위해 .avsc 파일이 있다고 가정하고 그 내용을 바탕으로 DataFrame Read / Write 를 수행합니다.

Scala Spark 에서는 abris 라이브러리를 통해 Registry / Raw Avro Format 섞어서 사용하고 DataFrame Schema 로 쉽게 전환할 수 있습니다.

이제 dfListings 에 맞추어 스키마를 만들고 Avro 파일을 저장해보겠습니다.

Schema Compatibility

Confluent Schema Registry (Link)

Avro 와 같은 Serialization Format 을 사용할 경우 일반적으로 Schema Evoluation 이라는 컨셉이 있습니다. 즉, 스키마가 변경될 경우 (필드 변경, 추가, 삭제) 를 어떻게 처리할 것인가를 정합니다.

직렬화 포맷이나 스키마 레지스트리 구현에 따라 지원되는 Schema Evolution 이나 용어가 다르지만 일반적으로는 크게 2가지로 나눌 수 있습니다. (X-2, X-1, X 순으로 스키마 버전이 업그레이드)

  • BACKWARD: X 버전을 사용하는 컨슈머가 X-1 을 버전의 데이터 처리 가능 (신규 버전으로 읽어 들이는데, 과거 버전으로 생성된 데이터를 = 과거의 스키마)

  • FORWARD: X-1 버전을 사용하는 컨슈머가 X 를 버전의 데이터 처리 가능 (과거 버전으로 읽어 들이는데, 신규 버전으로 생성된 데이터를 = 미래의 스키마)

이제 다른 버전의 스키마를 만들고 과거 버전의 Avro 를 읽어보겠습니다.

Avro 파일 내에는 데이터가 여전히 존재하기 때문에 과거 버전의 스키마를 사용한다면 'listing_name" 을 읽어서 쓸 수 있습니다.

Apache Hudi - Concept

Apache Hudi 의 경우에는 Parquet 와 Avro 를 섞어 쓰는 경우가 있습니다. Hudi 를 이용해 Write 시에 Merge on Read 모드를 이용하면

  • Avro File 을 Append 하고 추후에 많아진 Avro 파일을 Parquet 로 Compaction 합니다.

  • Snapshot Query 로 읽을때는 Parquet + Avro 를 둘다 읽어, 실시간 데이터를 볼 수 있습니다.

JDBC (MySQL, Postgres 등)

Generic Load / Save 함수를 이용하면 JDBC 드라이버를 이용해 MySQL 과 같은 RDB 에 데이터를 넣을 수 있습니다.

RDB 는 Spark 사용할 수 있는 주요 데이터 저장소 중 하나입니다. 데이터를 가져오는 주된 Source, API 등에서 활용하기 위한 서비스 데이터의 Sink 로서도 활용합니다.추후 다른 실습 챕터에서 데이터를 활용해 통계 데이터를 만들어 보고, 이번 챕터에서는 기본적인 사용법과 주의사항에 대해 알아봅니다. 우선 두 가지를 준비해야 합니다. MySQL (혹은 MySQL-compatible Aurora)

  • MySQL 내에 테이블을 만들어야 하고

  • Spark 가 쓸 수 있도록 MySQL JDBC Driver 를 Spark Class Path 에 추가해주어야 합니다. (MySQL Driver, Kinesis 와 같이 특정 라이브러리들은 라이센스 및 의존성 확장 이슈로 포함되지 않는 경우가 종종 있습니다)

우선 MySQL JDBC Driver 를 Class Path 에 추가 해보겠습니다. 여러가지 방법이 있지만, 이 섹션에서는 --packages 옵션을 이용해 추가해보도록 하겠습니다.

Spark 에서는 --packages (Maven Pom) 을 이용해 의존성 Jar 전체를 (transitive dependency) 가져올 수 있습니다. 또는 Spark 가 실행되는 Class Path (SPARK_HOME/jars, 일반적으로는 커스텀 경로를 권장) 필요한 의존성 Jar 를 미리 다운받아 로딩할수 있습니다.

두 가지 로딩 방법의 차이점은 무엇일까요? 만약 의존성이 많을 경우를 가정하여 논의해 봅시다.

다음은 MySQL 에서 실습에 사용할 테이블을 만드는 DDL 입니다.

Spark 실행시 --packages 옵션을 통해 MySQL Driver 를 다운받았고 DDL 을 실행했다면 이제 다음의 코드를 실행해 봅시다.

실행하면 아래와 같은 오류를 볼 수 있습니다.

이는 위에서 만든 MySQL 테이블 pipeline.ListingMetalisting_name 컬럼이 VARCHAR(200) 타입이고 실제 데이터는 이 타입에 들어갈 수 있는 제한 길이 보다 더 길기 때문입니다. 두 가지 방법으로 해결할 수 있습니다

  1. RDB listing_name 컬럼을 TEXT 타입으로 변환합니다 (ALTER TABLE pipeline.ListingMeta MODIFY COLUMN listing_name TEXT;)

  2. Spark 의 DataFrame 의 listing_name 컬럼 값의 길이를 자릅니다.

일반적으로 원본 데이터를 복원할 수 없도록 훼손하는 경우는 거의 없습니다. 다만 이 챕터에서는 Spark DataFrame 의 사용법을 익히는 것이 목적이므로 listing_name 컬럼 내 값을 잘라 적재하겠습니다.

listing_name_len 컬럼은 저장하지 않으므로 제거하겠습니다.

이제 JDBC Driver 를 이용해 DataFrame 을 저장해보겠습니다.

MySQL Client (mycli, DataGrip 등) 을 이용해 접속해서 데이터를 살펴보면, 잘 적재되었음을 알 수 있습니다.

여기서 꼭 집고 넘어가야 JDBC Write 설정이 있습니다.

  • mode 는 기존에 File 을 Write 하는것과 매우 다르게 동작합니다. "overwrite" 를 사용하면 기본설정으로는 Table 을 Drop 하고 DataFrame 의 Schema 를 기반으로 테이블을 다시 생성합니다. 따라서 Index 등 중요한 설정 정보가 삭제될 수 있어서 매우 유의해 사용해야 합니다.

  • truncate = True 로 설정시 mode = overwrite 라 하더라도 테이블은 삭제하지 않고 데이터만 삭제합니다. 그러나 여전히 데이터가 통채로 삭제될 수 있어 서비스 중인 DB 에서는 문제가 될 수 있습니다.

  • numPartitions 값은 JDBC 커넥션 숫자입니다. 위의 예제에서는 repartition(3) 이고 numPartitions(2) 인 경우이므로, coalesce(2 = numPartitions) 를 통해서 파티션 숫자를 줄인 후 Executor 마다 JDBC Connection 1개씩을 할당해 Write 합니다.

일반적으로는 mode = append 를 이용하고 Update 나 Upsert 가 필요할 경우 다른 방식으로 구현합니다. (foreach 내 직접 Statement 실행 등)

이제 JDBC Driver 를 이용해 MySQL 로 적재한 데이터를 다시 읽어 보겠습니다.

JDBC Driver 를 이용한 READ 를 수행할 때 사용한 설정들을 알아봅시다.

  • partitionColumn 에 명시된 컬럼을 이용해 numPartitions 숫자만큼 분산처리할 수 있습니다. lowerBound, upperBound 는 각 파티션당 얼마나 많은 데이터를 가져올지 분할량 (파티셔닝) 을 결정합니다.

  • partitionColumn 컬럼 타입은 RA numeric (숫자), date, timestamp 값만 가능합니다.

  • customSchema 를 이용하면 컬럼별 스키마를 지정할 수 있습니다. Spark 는 RDB 스키마를 Spark 호환 타입으로 변경하나, 일부 지원되지 않는 타입이나 잘못 변경될 경우 사용자는 customSchema 옵션을 이용하거나 직접 DataFrame 에서 cast() 함수를 사용할 수 있습니다.

  • fetchsize 는 너무 작으면 여러번 네트워크 통신이 필요하고 너무 많으면 DB 에 부하가 갈 수 있습니다.

위의 코드에서는 전체 컬럼이 약 4800 개이고, 파티션당 최대 1000 이므로 (1000, 1000, 1000, 1000, 약 800개) 로 파티션 당 Row 가 위치하길 바라지만, lowerBound, upperBound 는 갯수가 아니라 값입니다. 따라서 실제로 실행된 결과는 의도와는 다를 수 있습니다. 파티션별 Row 숫자를 살펴보면,

Write 할 때 .option("upperBound", "10000000") 로 변경하면 다음과 같은 결과가 나옵니다.

이는 listing_id 값의 범위가, 우리가 지정한 lowerBound, upperBound 로는 해결할 수 없는 특정 구간에 몰려있기 때문입니다. listing_id 값의 범위를 다시 살펴보면 다음과 같습니다.

데이터 사이즈가 점점 늘어가는 테이블에서 고정된 upperBound 로 인해 어떤 문제가 나중에 발생할까요?

Q1. Executor 의 수를 고정하고 만약 partitionColumn 의 count, min, max 값을 바탕으로 매일 lowerBound, upperBound 를 계산하려면 어떻게 하면 될까요?

Q2. 만약 특정 partitionColumn 값 구간에 데이터가 몰린다면 어떻게 해야할까요?

DB 에 부하가 생기니 2021년 이후 데이터만 필터링 해서 로딩해주세요 등과 같은 요구사항이 있을 경우 WHERE 조건문을 이용해 추가적인 필터링이 필요할 수 있습니다. 다만 option 으로 지정하는 querypartitionColumn 과 같이 쓰일 수 없기 때문에 dbtable 옵션을 이용해 보겠습니다.

dfListingJdbc.count() 를 실행하면 기존보다 줄어든 4865 > 4851 개임을 알 수 있습니다.

아래의 옵션들도 정말 중요하니, Spark Docs - JDBC Data Source 문서에서 한번 꼭 읽어보시길 권장 드립니다.

  • (READ, WRITE) queryTimeout

  • (READ, WRITE) query

  • (READ) fetchsize

  • (READ) pushDownPredicate

  • (READ) pushDownAggregate

  • (WRITE) batchsize

  • (WRITE) isolationLevel

Practice

실습 과제입니다.

Airbnb 데이터 셋을 이용해 Spark 로 가공한 뒤 MySQL 에 적재합니다.

  1. airbnb_listings 데이터와 airbnb_reviews 데이터 를 합쳐 listing 별 리뷰 숫자 및 평균 평점을 가진 DataFrame 을 만들고, airbnb_stat_review 테이블에 적재합니다.

  2. airbnb_listings 데이터와 airbnb_calendar 데이터를 합쳐 listing 별 예약 숫자와 예약 금액 합 을 월별로 집계해 DataFrame 을 만들고 airbnb_stat_sales_month 테이블에 적재합니다.

컬럼 이름은 자유롭게 지어도 상관 없습니다.

  • 단, DataFrame 컬럼 타입은 LONG, DOUBLE, STRING을 사용하며 (월은 '2021-11' 과 같이 문자열로 표현합니다)

  • MySQL DDL 에서는 대응되는 BIGINT UNSIGNED, DOUBLE(10, 5), VARCHAR(N) / TEXT 를 사용합니다.

이 데이터를 Parquet 로 저장하거나 Hive Insert Into 를 통해 저장한다면 Hive 의 타입과 매칭이 됩니다. 아래 두가지 문서를 살펴보면서 어떤 타입으로 위 타입들이 매핑될지 고민해 봅시다.

실습을 위해 아래의 환경을 로컬에 구축해 놓으면 편리합니다.

  • Docker 를 이용해 MySQL 을 띄워 실행

  • DataGrip (https://www.jetbrains.com/datagrip/) 과 같은 데이터 베이스 IDE

  • Jupyter 세팅 및 PySpark 설정

    • 데이터를 개발할때 빌드 없이 빠르게 Spark Shell 을 활용해 작업해볼 수 있습니다.

  • Intellij (https://www.jetbrains.com/idea/) 내 Spark Gradle 프로젝트

    • 실제 프로덕션 배포시 사용할 코드를 프로젝트 형식으로 개발할 수 있습니다.

    • 완료되면 로컬에서 spark-submit 등의 커맨드를 이용해 Spark Local Mode 로 Jar 를 실행해볼 수 있습니다.

Summary

아래는 이번 챕터에서 다룬 핵심 키워드입니다.

  • Save Mode

  • Column Pruning

  • Predicate Pushdown

  • Snappy

  • Serialization

  • Schema Registry

  • Schema Evoluation

  • Avro SpecificRecord / GenericRecord

Last updated

Was this helpful?