서비스에 즉시 반영되어야 하는 Transaction 과는 상관없는 후처리 (분석 혹은 운영성 배치작업) 할 경우 S3 (또는 GCS 등) 에 저장되어 있는 Parquet 포맷을 읽어 데이터 가공을 진행한다면 RDB 에 부하를 주지 않으면서 저렴한 비용으로 (S3) 작업을 할 수 있습니다.
또한 RDB 에 있는 데이터가 뿐만 아니라 비즈니스 담당자가 구글 시트 등에서 관리하는 데이터도 CSV 로 만들어 S3 등에 올려둔다면 Join 후 가공하는 것도 가능합니다.
listing 은 AirBNB 의 상품인 숙소 (Property) 입니다. listing 마다 id 값이 부여되며, 이 listing_id 값을 이용해 아래에서 airbnb_calendar.csv 내의 숙소마다의 일별 가격과 같이 데이터를 보도록 하겠습니다.
JSON 파일을 읽는 방법도 동일합니다. 다만 현재는 JSON 파일이 없으므로, 위에서 읽은 airbnb_listings.csv 를 JSON 으로 Write 해본 후 다시 읽어보겠습니다.
listing 의 id, url, name, summary, description 만 SELECT 후 JSON 으로 Write 를 해보면
dfListingSelected = dfListing.selectExpr("id as listing_id","listing_url", "name as listing_name", "summary as listing_summary", "description as listing_desc")
# 2개의 Partition 으로 만든 뒤, JSON 포맷으로 GZIP 압축하여 # airbnb_listings 디렉토리에 (Jupyter Notebook 의 경우 현재 ipynb 노트북이 위치한 디렉토리)# Overwrite (덮어쓰기) 저장합니다dfListingSelected\.repartition(2)\.write\.mode("overwrite")\.format("json")\.option("compression", "gzip")\.save("airbnb_listings")
ls -alh airbnb_listings/
total 1.9M
part-00000-850995d8-9964-4f8c-919f-60abaa20d45b-c000.json.gz
.part-00000-850995d8-9964-4f8c-919f-60abaa20d45b-c000.json.gz.crc
part-00001-850995d8-9964-4f8c-919f-60abaa20d45b-c000.json.gz
.part-00001-850995d8-9964-4f8c-919f-60abaa20d45b-c000.json.gz.crc
_SUCCESS
._SUCCESS.crc
Repartition 값을 2개로 지정해 2개의 파일이 생성된것을 볼 수 있습니다. 위의 예시에서는 파일이 너무 작아 사실 나눌 필요는 없으나, 예시를 위에서 Partition 값을 1보다 크게 만들어 파일을 나누었습니다.
"append" 는 기존 데이터에 덧붙입니다. File 의 경우에는 추가 파일이 만들어지므로, 의도하지 않았을 경우에는 중복이 발생합니다.
"overwrite": 는 기존 데이터를 삭제하고 새로 적재합니다. 데이터에 문제가 있거나 등의 이유로 재적재 하는 경우에만 이 옵션을 사용합니다. Default 로 지정해 놓을 경우 실수로 기존 데이터를 삭제할 수 있으니 주의해야 합니다.
"error": 데이터가 이미 존재할 경우 Error (Exception) 이 발생합이다. 대부분의 경우 기본 설정으로 이 값을 사용하는 편이 낫습니다.
Write 모드 중 Update / Upsert 는 존재하지 않습니다. 데이터를 업데이트하기 위해
Iceberg (rewrite)
Apache Hud (upsert)i / Delta Lake / Hive ACID 등을 사용할 수 있습니다.
이제 만들어진 데이터를 읽어보겠습니다. spark.read.format("json") 을 통해 읽는 것이 가능합니다.
# 디렉토리 내의 분할된 모든 JSON 파일을 다 읽습니다.dfListingJson = spark.read.format("json").load("./airbnb_listings")dfListingJson.count()dfListingJson.rdd.getNumPartitions()dfListingJson.printSchema(truncate=True)
4865 # dfListingJson.count()
2 # dfListingJson.rdd.getNumPartitions()
# dfListingJson.printSchema(truncate=True)
+----------+--------------------+--------------------+--------------------+--------------------+
|listing_id| listing_url| listing_name| listing_summary| listing_desc|
+----------+--------------------+--------------------+--------------------+--------------------+
| 360|https://www.airbn...|LoHi Secret garde...|Come enjoy our oa...|Come enjoy our oa...|
| 590|https://www.airbn...|Comfortable - an...|Large guest room ...|Large guest room ...|
| 592|https://www.airbn...| private|This room is in t...|This room is in t...|
| 1940|https://www.airbn...|Baker Studio Clos...|Great place for a...|Great place for a...|
| 2086|https://www.airbn...| Garden Level Condo|A furnished, gard...|A furnished, gard...|
| 31503|https://www.airbn...|Highland Park Gue...| null|Highland Park Gue...|
| 39405|https://www.airbn...|LoHi Secret garde...|Come enjoy our oa...|Come enjoy our oa...|
| 56185|https://www.airbn...|charming home for...| null|Spend time in Den...|
| 59631|https://www.airbn...|VICTORIAN TOWNHOM...|License #2017-BFN...|License #2017-BFN...|
| 74125|https://www.airbn...|Spacious Cap Hill...|1000' entire-firs...|1000' entire-firs...|
| 81540|https://www.airbn...|Affordable S. Den...|Bright, sunny 1 b...|Bright, sunny 1 b...|
| 90307|https://www.airbn...|Comfy King Size R...| null|This private bedr...|
| 98008|https://www.airbn...|Beautiful sun fil...|Locaton, location...|Locaton, location...|
| 98014|https://www.airbn...|Beautiful single ...| null|Hi Folks! Welcom...|
| 142683|https://www.airbn...|Historic Denver C...| null|One of three cond...|
| 172196|https://www.airbn...|Luxury Wash Park ...|Remodeled wash pa...|Remodeled wash pa...|
| 184529|https://www.airbn...|HIP SUITE IN WES...|Private SUIITE ...|Private SUIITE ...|
| 192430|https://www.airbn...|TREETOP VIEW ROOM...|Located in the de...|Located in the de...|
| 217996|https://www.airbn...| Highland Snug|Comfortable and c...|Comfortable and c...|
| 236207|https://www.airbn...|Denver Penthouse ...|Important Note : ...|Important Note : ...|
+----------+--------------------+--------------------+--------------------+--------------------+
데이터의 숫자도 Write 한 것과 동일하고, 컬럼이름도 올바른 것을 확인할 수 있습니다.
Spark 는 JSON 파일을 읽으면서 각 컬럼의 타입을 추론합니다. 컬럼 타입을 정하려면 각 컬럼별로 전체 값을 읽어야 합니다. 따라서 파일이 매우 크고 이미 타입을 알고 있다면 추론을 Spark 가 하는 대신 primitivesAsString 값을 이용해 String 으로 값을 세팅하고 cast() 등의 함수를 이용해 직접 타입을 지정하면 추론에 들어가는 시간을 줄일 수 있습니다.
CSV Write 또한 같은 방법으로 가능합니다. 이번에는 repartition(1) 을 지정하면 파일이 1개만 생성되고 압축을 하지 않았으므로 이전보다 사이즈가 커진것도 볼 수 있습니다 (1.9M -> 6.6M)
$ ls -alh airbnb_listings_selected
total 6.6M
part-00000-965313d2-2a07-49e4-b7a1-9c7d9a43129d-c000.csv
.part-00000-965313d2-2a07-49e4-b7a1-9c7d9a43129d-c000.csv.crc
_SUCCESS
일반적으로는 데이터를 로컬에 파일로 저장하지 않고 AWS S3, GCP GCS, HDFS 등에 저장합니다.이 때 시스템의 호환성 / 운영 용이성 등을 위해 데이터를 일별 /시간별로 나누어 다음과 같은 경로처럼 저장합니다.
Parquet (또는 ORC 등) 와 같이 다양한 언어 (Java, Python) 및 프레임워크 (Spark, Dask) 를 지원하는 경우에는 호환성을 테스트 해보는 편이 좋습니다.
예를 들어 Parquet 로 데이터를 저장한다고 가정하면
Scala Spark 에서 저장한 파일을 Hive, Presto, Dask 등에서 읽을 수 있는지
거꾸로 Python Dask 에서 저장한 파일을 반대로 Hive, Presto, Spark 등에서 읽을 수 있는지
사용하는 타입이 언어 및 프레임워크 별로 전부 호환이 되는지
사내에서 사용하는 도구에 대해 다양하게 테스트를 해보는 편이 낫습니다. Parquet 스펙이 지원한다 하더라도, 언어나 실제 구현체에서 지원하지 않을 수 있기 때문입니다.이런 이유로 타입의 경우에는 데이터 사이즈에서 미미한 손해를 보더라도, 호환성을 위해 큰 타입을 사용하는 편이 낫습니다. (INT32 -> Long 등)
더 세밀하게 사용할수록, 더 많은 문제를 마주칠 확률이 높습니다. 이른 최적화는 약이 아니라 독일 가능성이 많습니다.
타입을 잘못 맞추면 고통을 겪을 수 있습니다. 예를 들어, Spark 2.4 이하와 Hive, Impala 등에서는 Parquet 파일 저장시 Timestamp 타입을 INT96 (12 bytes) 으로 지정합니다. 그러나 Parquet 에서는 INT96 은 공식적으로 지원하진 않습니다.
따라서 Spark 에서는 Parquet 의 INT96 을 읽을 때 Timestamp 타입으로 자동으로 전환하며 spark.sql.parquet.int96AsTimestamp 옵션을 통해 변환될 타입을 지정할 수 있습니다.
Spark 3.0 이상에서는 spark.sql.parquet.outputTimestampType 를 통해 Write 시 Timestamp 값을 위한 Parquet 타입값을 지정할 수 있으며 기본으로는 TIMESTAMP_MICROS (Parquet 의 INT64 타입) 형식으로 저장합니다.
Since Spark 3.0, parquet logical type TIMESTAMP_MICROS is used by default while saving TIMESTAMP columns. In Spark version 2.4 and earlier, TIMESTAMP columns are saved as INT96 in parquet files. To set INT96 to spark.sql.parquet.outputTimestampType restores the previous behavior.
Avro 로 Serialized 된 데이터를 Kafka Broker 로 직접 보내고 읽을 수 있습니다.
만약 Schema Registry 를 이용하면 업타임에 Producer / Consumer 에서 스키마를 등록하거나 받아올 수 있습니다.
Confluent Schema Registry 의 경우 Avro 뿐만 아니라 Protobuf, JSON 형식의 스키마도 지원합니다.
Avro 로 직렬화 해 Binary 로 직접 보내는것과 Schema Registry 를 이용해 직렬화 보내는것은 Broker 에 전송되는 내용물이 다르므로 호환되지 않습니다. 사용시 주의해야 합니다.
Avro 스키마는 일반적으로 Schema Registry 또는 .avsc (스키마 레지스트리 사용하지 않을 경우) 를 통해 포맷을 정한 후 Read / Write 를 수행합니다.
아래의 코드에서는, 예시를 위해 .avsc 파일이 있다고 가정하고 그 내용을 바탕으로 DataFrame Read / Write 를 수행합니다.
Scala Spark 에서는 abris 라이브러리를 통해 Registry / Raw Avro Format 섞어서 사용하고 DataFrame Schema 로 쉽게 전환할 수 있습니다.
이제 dfListings 에 맞추어 스키마를 만들고 Avro 파일을 저장해보겠습니다.
# 일반적으로는 Schema Registry 또는 .avsc 파일을 읽어 스키마를 사용합니다. # 예시를 위해 String 으로 만든 Schema 를 사용합니다.schemaAvroListingV1 ="""{ "type": "record", "name": "AirbnbListing", "namespace": "com.airbnb", "fields": [ {"name": "listing_id", "type": "int"}, {"name": "listing_url", "type": "string"}, {"name": "listing_name", "type": "string"}, {"name": "listing_summary", "type": ["string", "null"]}, {"name": "listing_desc", "type": ["string", "null"]} ]}"""# Avro 파일 포맷으로 저장할때 옵션을 통해 Schema 를 지정합니다.dfListingSelected\.repartition(2)\.write\.mode("overwrite")\.format("avro")\.option("avroSchema", schemaAvroListingV1)\.save("./airbnb_listings_avro")
Schema Compatibility
Avro 와 같은 Serialization Format 을 사용할 경우 일반적으로 Schema Evoluation 이라는 컨셉이 있습니다.
즉, 스키마가 변경될 경우 (필드 변경, 추가, 삭제) 를 어떻게 처리할 것인가를 정합니다.
Avro 파일 내에는 데이터가 여전히 존재하기 때문에 과거 버전의 스키마를 사용한다면 'listing_name" 을 읽어서 쓸 수 있습니다.
# 과거 버전의 스키마로 (v1) 과거 버전으로 쓰여진 (V1) 데이터를 읽습니다.dfListingAvroV1 = spark.read.format("avro")\.option("avroSchema", schemaAvroListingV1)\.load("/FileStore/raw/airbnb_listings_avro")dfListingAvroV1.printSchema()
Apache Hudi 의 경우에는 Parquet 와 Avro 를 섞어 쓰는 경우가 있습니다. Hudi 를 이용해 Write 시에 Merge on Read 모드를 이용하면
Avro File 을 Append 하고 추후에 많아진 Avro 파일을 Parquet 로 Compaction 합니다.
Snapshot Query 로 읽을때는 Parquet + Avro 를 둘다 읽어, 실시간 데이터를 볼 수 있습니다.
JDBC (MySQL, Postgres 등)
Generic Load / Save 함수를 이용하면 JDBC 드라이버를 이용해 MySQL 과 같은 RDB 에 데이터를 넣을 수 있습니다.
RDB 는 Spark 사용할 수 있는 주요 데이터 저장소 중 하나입니다. 데이터를 가져오는 주된 Source, API 등에서 활용하기 위한 서비스 데이터의 Sink 로서도 활용합니다.추후 다른 실습 챕터에서 데이터를 활용해 통계 데이터를 만들어 보고, 이번 챕터에서는 기본적인 사용법과 주의사항에 대해 알아봅니다.
우선 두 가지를 준비해야 합니다. MySQL (혹은 MySQL-compatible Aurora)
MySQL 내에 테이블을 만들어야 하고
Spark 가 쓸 수 있도록 MySQL JDBC Driver 를 Spark Class Path 에 추가해주어야 합니다. (MySQL Driver, Kinesis 와 같이 특정 라이브러리들은 라이센스 및 의존성 확장 이슈로 포함되지 않는 경우가 종종 있습니다)
우선 MySQL JDBC Driver 를 Class Path 에 추가 해보겠습니다. 여러가지 방법이 있지만, 이 섹션에서는 --packages 옵션을 이용해 추가해보도록 하겠습니다.
Spark 에서는 --packages (Maven Pom) 을 이용해 의존성 Jar 전체를 (transitive dependency) 가져올 수 있습니다. 또는 Spark 가 실행되는 Class Path (SPARK_HOME/jars, 일반적으로는 커스텀 경로를 권장) 필요한 의존성 Jar 를 미리 다운받아 로딩할수 있습니다.
두 가지 로딩 방법의 차이점은 무엇일까요? 만약 의존성이 많을 경우를 가정하여 논의해 봅시다.
다음은 MySQL 에서 실습에 사용할 테이블을 만드는 DDL 입니다.
CREATEDATABASEpipeline;USE pipeline;CREATETABLE `ListingMeta`(-- primary key`listing_id`BIGINT UNSIGNED NOT NULLPRIMARY KEY,`listing_name`VARCHAR(200) NULL,`listing_desc`TEXTNULL,`listing_summary`TEXTNULL,`listing_url`TEXTNULL,-- FK columns-- common`created_at`datetimeDEFAULT CURRENT_TIMESTAMP NOT NULL) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4COLLATE= utf8mb4_unicode_ci;
Spark 실행시 --packages 옵션을 통해 MySQL Driver 를 다운받았고 DDL 을 실행했다면 이제 다음의 코드를 실행해 봅시다.
Job aborted due to stage failure: Task 0 in stage 60.0 failed 1 times, most recent failure: Lost task 0.0 in stage 60.0 (TID 115) (jupyter executor driver): java.sql.BatchUpdateException: Data truncation: Data too long for column 'listing_name' at row 1
이는 위에서 만든 MySQL 테이블 pipeline.ListingMeta 의 listing_name 컬럼이 VARCHAR(200) 타입이고 실제 데이터는 이 타입에 들어갈 수 있는 제한 길이 보다 더 길기 때문입니다. 두 가지 방법으로 해결할 수 있습니다
mode 는 기존에 File 을 Write 하는것과 매우 다르게 동작합니다. "overwrite" 를 사용하면 기본설정으로는 Table 을 Drop 하고 DataFrame 의 Schema 를 기반으로 테이블을 다시 생성합니다. 따라서 Index 등 중요한 설정 정보가 삭제될 수 있어서 매우 유의해 사용해야 합니다.
truncate = True 로 설정시 mode = overwrite 라 하더라도 테이블은 삭제하지 않고 데이터만 삭제합니다. 그러나 여전히 데이터가 통채로 삭제될 수 있어 서비스 중인 DB 에서는 문제가 될 수 있습니다.
numPartitions 값은 JDBC 커넥션 숫자입니다. 위의 예제에서는 repartition(3) 이고 numPartitions(2) 인 경우이므로, coalesce(2 = numPartitions) 를 통해서 파티션 숫자를 줄인 후 Executor 마다 JDBC Connection 1개씩을 할당해 Write 합니다.
일반적으로는 mode = append 를 이용하고 Update 나 Upsert 가 필요할 경우 다른 방식으로 구현합니다. (foreach 내 직접 Statement 실행 등)
이제 JDBC Driver 를 이용해 MySQL 로 적재한 데이터를 다시 읽어 보겠습니다.
partitionColumn 에 명시된 컬럼을 이용해 numPartitions 숫자만큼 분산처리할 수 있습니다. lowerBound, upperBound 는 각 파티션당 얼마나 많은 데이터를 가져올지 분할량 (파티셔닝) 을 결정합니다.
partitionColumn 컬럼 타입은 RA numeric (숫자), date, timestamp 값만 가능합니다.
customSchema 를 이용하면 컬럼별 스키마를 지정할 수 있습니다. Spark 는 RDB 스키마를 Spark 호환 타입으로 변경하나, 일부 지원되지 않는 타입이나 잘못 변경될 경우 사용자는 customSchema 옵션을 이용하거나 직접 DataFrame 에서 cast() 함수를 사용할 수 있습니다.
fetchsize 는 너무 작으면 여러번 네트워크 통신이 필요하고 너무 많으면 DB 에 부하가 갈 수 있습니다.
위의 코드에서는 전체 컬럼이 약 4800 개이고, 파티션당 최대 1000 이므로 (1000, 1000, 1000, 1000, 약 800개) 로 파티션 당 Row 가 위치하길 바라지만, lowerBound, upperBound 는 갯수가 아니라 값입니다. 따라서 실제로 실행된 결과는 의도와는 다를 수 있습니다. 파티션별 Row 숫자를 살펴보면,
데이터 사이즈가 점점 늘어가는 테이블에서 고정된 upperBound 로 인해 어떤 문제가 나중에 발생할까요?
Q1. Executor 의 수를 고정하고 만약 partitionColumn 의 count, min, max 값을 바탕으로 매일 lowerBound, upperBound 를 계산하려면 어떻게 하면 될까요?
Q2. 만약 특정 partitionColumn 값 구간에 데이터가 몰린다면 어떻게 해야할까요?
DB 에 부하가 생기니 2021년 이후 데이터만 필터링 해서 로딩해주세요 등과 같은 요구사항이 있을 경우 WHERE 조건문을 이용해 추가적인 필터링이 필요할 수 있습니다. 다만 option 으로 지정하는 query 는 partitionColumn 과 같이 쓰일 수 없기 때문에 dbtable 옵션을 이용해 보겠습니다.
It is not allowed to specify query and partitionColumn options at the same time. When specifying partitionColumn option is required, the subquery can be specified using dbtable option instead and partition columns can be qualified using the subquery alias provided as part of dbtable.