Jupyter 에서 자동완성을 통해 DataFrame 이 지원하는 함수를 보면 'GlobalTempView' 라는것이 보입니다.
createTempView, createOrReplaceTempView 를 통해 만든 View 는 해당 Spark Session 내에서 사용이 가능합니다
하지만 Global 로 만들면 서로 다른 Spark Session 간 공유가 가능합니다.
SubQuery 이용하면 SELECT * FROM (SELECT ..) 와 같이 중첩되어 가독성이 떨어지고 유지보수가 힘듭니다. 아래 예제에서는 WITH 구문을 이용해 SubQuery 에 이름을 붙여 편리하고 읽기 쉽게 사용할 수 있습니다.
대부분의 SQL 에서 WITH 구문을 지원합니다. Spark SQL 은 Hive 와 호환되므로 Spark SQL 에서도 당연히 WITH 구문을 이용할 수 있습니다.
propertyTypeHouse = 'House'
dfListingHouseAvailable = spark.sql(f"""
WITH
LISTING_HOUSE AS (
SELECT id as listing_id, listing_url, name
FROM LISTING_META
WHERE property_type = '{propertyTypeHouse}'
)
SELECT
LISTING_CALENDAR.listing_id,
listing_url,
name as listing_name,
date,
available,
price
FROM LISTING_CALENDAR
INNER JOIN LISTING_HOUSE
ON LISTING_CALENDAR.listing_id = LISTING_HOUSE.listing_id
WHERE LISTING_CALENDAR.available = 't'
""")
# dfListingHouseAvailable.show()
+----------+--------------------+--------------------+----------+---------+-------+
|listing_id| listing_url| listing_name| date|available| price|
+----------+--------------------+--------------------+----------+---------+-------+
| 3138055|https://www.airbn...|Classic Home on C...|2019-12-01| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2019-12-13| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2019-12-14| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2019-12-15| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2019-12-21| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2019-12-22| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2019-12-26| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2019-12-27| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2019-12-28| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2019-12-29| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2020-01-03| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2020-01-04| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2020-01-05| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2020-01-10| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2020-01-11| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2020-01-12| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2020-01-17| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2020-01-18| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2020-01-19| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2020-01-24| t|$130.00|
+----------+--------------------+--------------------+----------+---------+-------+
위에서 사용한 쿼리 예제에서는 LISTING_META (Product Meta) 와 LISTING_CALENDAR(Product Meta) 테이블을 조인했습니다.
Product Meta + Product Event (Transaction, Calendar, Activity) 조합은 앞으로 여러분들이 굉장히 많이 사용할 데이터 결합 패턴입니다.
상품의 속성은 일반적으로 별도 테이블로 존재하고 상품 이벤트에 비해 비교적 적은 Row 로 구성되어 있습니다
상품의 이벤트는 일반적으로 상품 메타에 비해 데이터 사이즈가 매우 큽니다
예를 들어, 상품은 1개여도 해당 상품에 대한 주문은 여러번 발생할 수 있습니다
도메인에 따라 상품 메타가 더 자세히 나뉘기도 합니다
커머스의 경우에는 상품 > 상품 옵션일수도 있습니다
여행의 경우에는 호텔 > 룸 > 레이트 플랜 (룸 + 옵션) > 날짜 와 같이 더 복잡한 형태로 구성되기도 합니다
위와 같이 상품 이벤트에 대해 상품 메타를 붙이는 경우도 있지만 많은 경우에 상품 메타를 축으로 잡고 집계 연산을 수행할 수 있습니다. (e.g, 통계 데이터 등)
dfListingAvailability = spark.sql("""
WITH LISTING_GROUPED AS (
SELECT
listing_id,
count(CASE WHEN available = 't' THEN 1 END) as count_date_available,
count(CASE WHEN available <> 't' THEN 1 END) as count_date_unavailable
FROM LISTING_CALENDAR
GROUP BY listing_id
)
SELECT
LISTING_META.id as listing_id,
LISTING_META.listing_url as listing_url,
LISTING_META.name as listing_name,
LISTING_META.review_scores_rating as review_scores_rating,
LISTING_META.price as listing_price_basic,
coalesce(LISTING_GROUPED.count_date_available, 0) as count_date_available,
coalesce(LISTING_GROUPED.count_date_unavailable, 0) as count_date_unavailable
FROM LISTING_META
LEFT JOIN LISTING_GROUPED
ON LISTING_META.id = LISTING_GROUPED.listing_id
""")
LISTING_CALENDAR 는 listing_id 당 중복되는 'date' 가 없다고 가정합니다. 만약 중복이 있다면 distinct 를 사용하거나 중복이 없는 상품 메타 단위로 더 세분화 해 집계할 수 있습니다
통계 등을 위한 조인 및 집계 관련해서는 추후에 다른 챕터에서 더 설명하도록 하겠습니다. 아래 그림을 통해 JOIN 종류에 따른 결과를 시각적으로 볼 수 있습니다.
spark.sql("""
CACHE TABLE LISTING_STAT AS (
SELECT *
FROM LISTING_AVAILABILITY
WHERE review_scores_rating IS NOT NULL
)
""")
spark.sql("SHOW TABLES").show()
2021년 10월 16일의 해당 테이블 데이터는 s3://udon-data-lake/db/udon_topping_sales/2021/10/16" 위치에 있고
미래에 그 위치가 변해도 "내가 알려줄거야" 라고 말한다면 어떨까요?
그 시스템이 바로 메타스토어 입니다. (메타 정보를 저장하고 있는 시스템)
Hive Metastore 는 Spark 와 자주 같이 쓰이곤 합니다. 아래 스크린샷은 Spark 에서 Hive Metastore 를 통해 RDS (MySQL 등) 에 있는 Table 및 Partition 의 정보를 읽어 S3 의 위치를 찾아 낸 뒤 Spark 를 통해 읽는 경우를 보여줍니다. 즉, Metastore 는 테이블의 실제 데이터인 '파일' 이 어디에 있는지만 저장하지 파일을 저장하진 않습니다.
아래 그림에서는 On-Prem 에서 Hadoop / Hive Metastore 에 대응되는 AWS 의 서비스를 보여줍니다.
Hadoop Cluster 는 Computing (Yarn) + Storage (HDFS) 으로 구성되는데
AWS 사용시 Storage (HDFS) 는 S3 로 대체하거나 EMR 내의 Core 노드로 HDFS 를 운영이 가능합니다
AWS 사용시 Computing 은 EMR 의 Core (AM) + Task 노드로 대체가 가능합니다
Metastore 서비스에 Database, Table, Partition 에 대한 정보를 넣게 되면 수 많은 시스템에서 데이터를 조회하는데 사용할 수 있습니다. 아래 그림은 Hive Metastore 를 통해 Presto, Spark SQL 등에서 S3 는 물론 다양한 Storage 로 접근하는 사례를 보여줍니다.
모든것을 Hive Metastore 에 등록할 필요는 없습니다. 모든 시스템이 Hive Metastore 와 통합되는것은 아니며
필요할 경우 Retool / Redash 등 UI 도구에서 저장소를 직접 등록해 조회할 수 있습니다.
Metastore 의 Provisioning 은 AWS / GCP 와 같은 클라우드를 쓴다면 어렵지 않습니다.
AWS 에서는 EMR 을 통해 Hive Metastore 서비스를 쉽게 띄울 수 있고 RDB 로 RDS 나 Aurora 를 사용할 수 있습니다.
AWS EMR 내의 Hive Metastore 대신 AWS 관리형 메타스토어인 AWS Glue Catalog 를 사용할 수 있습니다.
대부분의 기업들은 Spark 이외에도 Presto 와 같은 범용 쿼리 엔진을 운영합니다. 모든 사용자가 Spark 를 사용할 수 있는것은 아니기 때문입니다. 따라서 Hive Metastore 도 같이 운용되고 있습니다.
Hive Metastore 부분을 조금 더 살펴보면, 위 스크린샷과 같이 크게 3가지 운영 형태로 나눌 수 있습니다
Spark 는 아무런 설정을 하지 않을 경우 Embedded Metastore 로 동작합니다.
노트북 등에서 로컬 환경 테스팅용으로 사용할 수 있습니다.
만약 외부에 HMS 가 없고, Spark 에서 Embded Metastore 모드를 이용하되 RDB 만 외부 것을 이용하고 싶다면 Local Metastore 모드로 사용할 수 있습니다
공유되는 HMS 가 없어 운영 등 노동 투입의 여지가 적으나, Production 에는 권장되지 않습니다.
EMR 등에 이미 떠 있는 HMS (Hive Metastore Service) 값을 spark/conf/hive-site.xml 내에 hive.metastore.uris 프로퍼티로 세팅하면 Remote Metastore 모드로 동작합니다.
대부분의 Production 환경에서는 Remote Metastore 형태로 사용합니다. HMS 를 별도로 운영하고, Spark Client 들이 접속해 사용합니다.
# spark/conf/hive-site.xml
# Remote Hive Metastore 연결을 위한 Spark 설정
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://hive-metastore-prod.udon.io:9083</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
</configuration>
# spark/conf/hive-site.xml
# Local Hive Metastore 사용을 위한 외부 RDB 연결 설정
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://localhost/metastore_db?createDatabaseIfNotExist=true</value>
<description>metadata is stored in a MySQL server</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>MySQL JDBC driver class</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>USER</value>
<description>user name for connecting to mysql server </description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>PASSWORD</value>
<description>password for connecting to mysql server </description>
</property>
</configuration>
만약 Hive Metastore 대신 Glue Catalog 를 Spark 의 Metastore 로 사용한다면, 다음 문서의 가이드를 따라 각종 컴퓨팅 프레임워크의 메타스토어로 Glue 를 설정할 수 있습니다.
[실습] Embedded, Local Metastore 모드 w/ MySQL Docker
spark-warehouse
metastore_db
Table in Hive Metastore
이제 Metastore 에 대해 알아보았으니, Metastore 에 저장될 테이블을 생성해보겠습니다. 이 섹션에서는 Hive Metastore 를 사용합니다.
사용자 실습 환경에 따라 EMR 처럼 Remote External Hive Metatore 거나 로컬 내의 Embedded 된 HMS 일 수 있습니다.
View 는 논리적인 구조로서, 물리적으로 저장되어 있지 않아 사용할 때마다 쿼리나 Transformation 을 실행해 사용해야 합니다.
반면 Table 의 데이터는 CSV, JSON, Parquet 등 물리적으로 존재하는 데이터로서 읽어서 바로 사용할 수 있습니다.
Spark 는 다음의 Table / View 를 지원합니다. 전역 여부 및 관리 여부에 따라 구분되는데, 각각은 어떻게 다를까요?
Session-local Temporary View (Temporary Table)
Global Temporary View (CREATE GLOBAL TEMPORARY VIEW)
Global Permanent View (CREATE GLOBAL VIEW)
Global Managed Table
Global External (Unmanaged) Table
Hive 에서 Table 은 크게 2가지 종류로 나눌 수 있습니다. Hive 이야기를 하는 이유는, Spark 가 Hive Metastore 를 사용해 테이블을 생성하고 읽기 때문입니다.
Managed Table 은 Hive 테이블과 데이터가 같이 관리됩니다. 즉 테이블을 삭제하면 데이터도 같이 삭제됩니다.
External (Unmanaged) Table 은 테이블과 데이터의 Life-cycle 이 분리되어 있습니다. 테이블을 삭제하면, '스키마' 인 테이블만 삭제되며 데이터는 그대로 존재합니다.
만약 여러분이 사용하는 Spark Session 이 Hive Metastore 에 연결되어있다면 SPARK SQL 의 DESCRIBE FORMATTED {TABLE} 쿼리를 통해 테이블의 타입을 볼 수 있습니다.
Managed / External 테이블별로 Hive 가 지원하는 기능이 조금은 다릅니다. Hive Confluence 에서 자세히 살펴 볼 수 있습니다.
일반적으로는 Hive External 테이블을 다음의 이유로 많이 사용합니다.
External 테이블을 사용하면 실수로 사용자가 테이블 삭제시에도 데이터가 보존되며
많은 경우에 테이블과 데이터의 Life-cycle 은 다르기 때문에 같이 삭제할 필요가 없기 때문입니다
만약 데이터를 더이상 사용하지 않는다면, External 테이블의 '스키마' 만 삭제하고 데이터는 보관처리 할 수 있습니다.
아니면 테이블은 그대로 살려두고 특정 기간동안의 데이터 (2020.01.01 ~ 2020.12.31 등 잘 사용되지 않는 과거 파티션) 만 저비용 스토리지로 내릴 수 있습니다.
Hive Managed Table 사용시 데이터는 외부에 테이블 스키마와 별도로 존재하는 것이 아니라 테이블과 같이 '관리' 됩니다. Hive 옵션인 (hive-site.xml) hive.metastore.warehouse.dir 위치에 데이터가 존재하게 됩니다.
만약 Spark 2.0+ 를 사용한다면, 이 옵션은 deprecated 되었고 대신 spark.sql.warehouse.dir 을 사용합니다.
아래의 두 가지 경우 모두 AWS EMR 을 이용한다면 Core 노드의 HDFS 에 기본으로 저장됩니다.
spark.sql.warehouse.dir (Spark 에서 Hive Metastore 를 이용해 Managed 테이블 생성시)
hive.metastore.warehouse.dir (Hive CLI 등 Hive 에서 직접 Managed 테이블 생성시)
그러나 EMR 종료시 HDFS 의 데이터는 전부 날아가므로 주의가 필요합니다. AWS 환경에서는 EMR 을 사용하더라도 HDFS 대신 S3 를 사용하는 편입니다.
Spark and Hive Table
Spark SQL 는 크게 3가지 방법으로 'CREATE TABLE' 을 이용해 테이블을 생성할 수 있습니다
saveAsTable() 은 mode = Overwrite 의 경우에는 현재 스키마를 무시합니다.
Catalog.createTable(), DataFrame.saveAsTable() 모두 파일의 경로 (LOCATION) 가 없을때는 Managed 테이블이 생성됩니다 (spark.sql.warehouse.dir)
Data Source 를 이용한 테이블 생성은 Spark 2.1 미만에서는 Hive 테이블 생성과는 다르게 동작할 수 있습니다.
From Spark 2.1, Datasource tables now store partition metadata in the Hive metastore. This means that Hive DDLs such as ALTER TABLE PARTITION ... SET LOCATION are now available for tables created with the Datasource API.
또한 Spark 2.0+ 부터는 에서 LOCATION 을 지정해 테이블을 생성하는 모든 경우에는 EXTERNAL 테이블을 생성합니다. Spark 에서는 LOCATION 을 지정해서 Managed Table 을 생성할 수 없습니다.
From Spark 2.0, CREATE TABLE ... LOCATION is equivalent to CREATE EXTERNAL TABLE ... LOCATION in order to prevent accidental dropping the existing data in the user-provided locations. That means, a Hive table created in Spark SQL with the user-specified location is always a Hive external table. Dropping external tables will not remove the data. Users are not allowed to specify the location for Hive managed tables. Note that this is different from the Hive behavior.
만약 CREATE TABLE (HIVE FORMAT) 이용해서 생성시 테이블에 TBLPROPERTIES ('spark.sql.sources.provider' = 'PARQUET') 들어가지 않아, S3 Path 오류가 발생할 수 있습니다.
(Spark 에서는 spark.sql 로 시작하는 TBLPROPERTIES 를 사용자가 직접 세팅 불가)
위에서서 살펴 보았듯이 테이블을 생성하기 위한 다양한 방법이 존재합니다. 또한 API 마다의 특징과 Spark, Hadoop 버전에 따른 호환성 및 각종 이슈가 존재하므로 이 섹션에서는 범용적인 방법에 대해서 다룹니다.
어떤 방법을 이용해 Hive Table 을 생성하는 편이 나을까요?
Catalog API 나 DataFrame API 를 이용할 수도 있겠지만, Spark 를 이용해 만든 Hive Metastore 의 테이블이 Presto 등 다양한 곳에서 읽힐 수 있다는 점을 고려해보면,
Spark DataFrame 에서 자동 변환되는 타입이 아니라 사용자가 정의한 타입이 명시된 Hive DDL 을 실행하는 편이 더 나을 수 있습니다
테이블은 Spark Batch Job 이 아니라 외부에서 DDL 등을 이용해 직접 생성하고, Batch Job 은 데이터만 적재하는 경우도 많습니다
만약 문제 발생시 DDL 을 다시 실행해야 할때, 데이터 적재가 포함된 Spark Batch Job (Application) 을 별도로 실행해야 한다는건 부담스럽습니다.DDL 을 별도로 관리하고 있다면, Jupyter 에서 Spark SQL 로 혹은 Hive CLI 에서 DDL 을 바로 실행할 수 있습니다.
실행 전 해당 S3 버켓이 존재하는지, 그리고 해당 버켓에 권한이 있는지 확인합니다.
spark.sql("""
CREATE DATABASE IF NOT EXISTS udon_db
COMMENT 'Udon DB' LOCATION 's3://udon-data-lake/db'
""")
spark.sql("""
CREATE TABLE IF NOT EXISTS udon_db.udon_topping_sales (
udon_topping_id BIGINT,
udon_topping_name STRING,
sales_price DOUBLE,
shipping_date DATE,
created_at TIMESTAMP
)
USING PARQUET
LOCATION 's3://udon-data-lake/db/udon_topping_sales'
COMMENT 'Udon Topping Sales'
""")
dfListingAvailabilityFinal\
.repartition(2)\
.write\
.format("parquet")\
.mode("overwrite")\
.save("s3://airbnb-data-lake/db/listing_availability")
# Spark Session 시작 후 스키마가 변경될 경우 REFRESH TABLE 을 이용해 테이블 정보를 다시 받아올 수 있습니다
# spark.catalog.refreshTable("airbnb_db.listing_availability") 와 동일
spark.sql("REFRESH TABLE airbnb_db.listing_availability")
spark.sql("""
SELECT *
FROM airbnb_db.listing_availability
""").show()
S3 를 살펴보면, repartition(2) 로 인해 2개의 Parquet 파일이 생성된 것을 볼 수 있습니다.
aws s3 ls s3://airbnb-data-lake/db/listing_availability/
part-00000-df4761db-25ba-46e3-8d66-3d2527057e78-c000.snappy.parquet
part-00001-df4761db-25ba-46e3-8d66-3d2527057e78-c000.snappy.parquet
Partitioned Table
위에서 생성한 udon_db.udon_topping_sales 과 airbnb_db.listing_availability 의 경우 파티션이 없는 테이블입니다.
지금 이야기 하는 파티션은 Hive 테이블의 파티션으로, Spark 의 repartition(2) 등에서 사용하는 파티션과는 다릅니다.
파티션은 일종의 디렉토리로서, 데이터를 나누는 역할을 합니다. 파티션 값을 쿼리에 지정함으로써대량의 데이터가 존재하는 테이블에 필요한 부분만 탐색할 수 있습니다.
> Partitioned tables can be created using the PARTITIONED BY clause. A table can have one or more partition columns and a separate data directory is created for each distinct value combination in the partition columns. Further, tables or partitions can be bucketed using CLUSTERED BY columns, and data can be sorted within that bucket via SORT BY columns. This can improve performance on certain kinds of queries.
아래와 같이 udon_topping_sales 라는 일별로 주문된 내역을 저장하고 있는 테이블이 있다고 가정해봅시다.
만약 파티션 컬럼 (dt) 없이 일반 컬럼 created_dt (Date 타입) 으로만 데이터를 탐색해야 한다면, "전체" 데이터를 다 읽으면서 created_dt 값을 이용해 필터링을 진행합니다.
반면 파티션 컬럼 (dt) 를 사용한다면, 디렉토리 (또는 Object Path in case of S3) 내의 데이터를 전혀 읽지 않아도 되므로 속도가 훨씬 빠릅니다.
SELECT *
FROM udon_db.udon_topping_sales
WHERE
dt BETWEEN '20211001' AND '20211031' -- Partition 필터링 (데이터를 읽지 않고 스킵)
AND (created_dt >= DATE('20211001') AND created_dt <= DATE('20211031')) -- Row 필터링 (데이터를 읽어서 판별)
파티션이 없는 테이블은 데이터 적재 관점에서도 어려움을 만듭니다.
만약 스냅샷 데이터 (e.g., 누적된 전체 주문건) 을 매일매일 적재해야 한다면 새로운 S3 경로에 쌓고 DDL 을 통해 Table 의 LOCATION 을 변경해야 할까요?
과거 시점의 스냅샷 데이터를 읽고싶다면 어떻게 해야할까요?
만약 Delta 데이터 (e.g., 당일 생성 주문건) 을 매일 추가로 만들어서 같은 테이블 내에서 쓰게 해야한다면 Append 해야할까요?
File 이 temp 에 써진 후 작업 종료 이후에 대상 S3 버켓으로 옮겨지는 순간에는 데이터를 읽을 수 있을까요?
Hive 테이블의 파티션은 2 가지로 나눌 수 있습니다
Dynamic Partition: 데이터를 보고 어떤 파티션인지 (어디에 위치할지를) 결정합니다
데이터를 하나씩 보면서 파티션을 결정해야 하므로 속도가 느릴 수 있습니다.
Static Partition: 데이터를 보지 않고, 사용자가 어떤 파티션인지 (어디에 위치할지를) 직접 지정합니다.
일반적으로 Static Partition 을 사용합니다. 데이터를 가공하는 시점에 대부분 파티션을 알고 있기 때문입니다.
데이터를 보고 결정하지 않아도 되므로 Partition 값을 위한 데이터 컬럼이 필요하지 않습니다.
"Note that you do not need to add an input column for the static partition column because its value is already known in the PARTITION clause."
Static / Partition 관련해서는 다음의 Hive Confluence 문서를 추가적으로 참조할 수 있습니다
Spark 에서는 다음의 옵션을 통해 Dynamic Partition 을 활성화 할 수 있습니다.
# spark-default.conf 에서 전역으로 설정하거나
# spark session 내에서 세션 범위로 설정
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
이제 이전 섹션에서 만들어 두었던 listing_meta, listing_calendar 를 위한 Partition 테이블을 생성해보겠습니다.
spark.sql("""
CREATE TABLE IF NOT EXISTS airbnb_db.listing_meta (
listing_id BIGINT,
listing_name STRING,
listing_url STRING,
property_type STRING,
country STRING,
state STRING,
city STRING,
description STRING,
dt STRING
)
USING PARQUET
PARTITIONED BY (dt)
LOCATION 's3://airbnb-data-lake/db/listing_meta'
COMMENT 'Airbnb Listing Meta'
""")
실행 후에 Hive CLI 나 spark.sql 로 테이블을 확인해 보면 다음과 같습니다. (Spark 버전 차이로 인해 일부 값이 다를 수 있습니다.)
이제 데이터를 S3 에 Parquet 포맷으로 적재할텐데, 2가지 방법으로 나누어 진행해보겠습니다.
listing_calendar 는 DataFrame 의 partitionBy() 와 saveAsTable 을 이용해서 데이터를 저장해 보겠습니다.
partitionListingMeta = '20211101'
spark.sql(f"""
INSERT OVERWRITE airbnb_db.listing_meta
PARTITION (dt = '{partitionListingMeta}')
SELECT
CAST(id AS BIGINT) as listing_id,
name as listing_name,
listing_url,
property_type,
country,
state,
city,
description
FROM LISTING_META -- 이전 섹션에서 만든 Temporary View 입니다
""")
이제 만든 테이블 내의 데이터를 SELECT 해 DataFrame 으로 만들고 실행 계획과 데이터를 확인해보겠습니다. PartitionFilter 를 통해 지정된 Location 의 데이터만 탐색했음을 알 수 있습니다.
dfTableListingMeta = spark.sql("""
SELECT *
FROM airbnb_db.listing_meta
WHERE dt = '20211101'
""")
dfTableListingMeta.explain("FORMATTED")
dfTableListingMeta.show()
기존 파티션이 삭제된 이유는 sql.sources.partitionOverwriteMode = static 기본값으로 잡혀 있기 때문입니다. (파티션 뿐만 아니라 S3 내 데이터도 삭제된 걸 확인할 수 있습니다.)
위에서 만들었던 listing_meta (INSERT OVERWRITE PARTITION (..)) 는 static 파티션이기 때문에 문제가 없었습니다. (Partition 없을 경우에만 생성됩니다)
그러나 partitionBy("dt") 사용시에는 데이터를 기준으로 파티션이 결정되는 dynamic 파티션처럼 동작합니다. 그리고 partitionOverwriteMode = static 이 Default 이기 때문에 데이터 적재시 전체 파티션이 삭제됩니다.
spark.sql.sources.partitionOverwriteMode 옵션은 Spark 2.3 에 추가되었습니다.
> When INSERT OVERWRITE a partitioned data source table, we currently support 2 modes: static and dynamic. In static mode, Spark deletes all the partitions that match the partition specification(e.g. PARTITION(a=1,b)) in the INSERT statement, before overwriting. In dynamic mode, Spark doesn't delete partitions ahead, and only overwrite those partitions that have data written into it at runtime. By default we use static mode to keep the same behavior of Spark prior to 2.3. Note that this config doesn't affect Hive serde tables, as they are always overwritten with dynamic mode. This can also be set as an output option for a data source using key partitionOverwriteMode (which takes precedence over this setting), e.g. dataframe.write.option("partitionOverwriteMode", "dynamic").save(path).
spark.sql.sources.partitionOverwriteMode = dynamic 으로 전체 파티션이 삭제되는 것을 막을 수 있습니다. 있으나 만약 PathOutputCommitProtocol 를 사용한다면 Hadoop 3.1+ 을 쓰는 경우에는 spark.sql.sources.partitionOverwriteMode = dynamic 이 사용 불가능합니다.
그리고 AWS EMRFS S3-Optimized Commiter 를 이용할 경우에는 spark.sql.sources.partitionOverwriteMode = static 여야 합니다 (dynamic 사용 불가)
Improves application performance by avoiding list and rename operations done in Amazon S3 during job and task commit phases.
Avoids issues that can occur with Amazon S3 eventual consistency during job and task commit phases, and helps improve job correctness under task failure conditions.
이런 이유로 인해 일부 환경에서 (Hadoop 3.1+, PathOutputCommitProtocol) DataFrame.partitionBy().saveAsTable() 사용이 전체 파티션 삭제를 유발하므로, INSERT OVERWRITE INTO 로 변경해보겠습니다.
ALTER TABLE ADD PARTITION DDL 을 통해 파티션을 미리 생성합니다 (미리 만들지 않아도 INSERT INTO 실행시 자동 생성됩니다. 이 섹션에서는 실습을 위해 직접 만듭니다)
INSERT OVERWRITE INTO 로 넣을 경우 Static 파티션 을 이용하므로 dt 컬럼이 필요 없습니다. drop() 함수를 통해 제거합니다
INSERT OVERWITE INTO 를 위해 현재까지 가공한 DataFrame 을 createOrReplaceView() 함수를 사용해 View 로 만듭니다
INSERT OVERWRITE INTO 를 실행합니다
Pandas 라이브러리를 이용해 반복문을 위한 파티션 값 (dt) 를 생성합니다. (Pandas 를 이용하지 않아도 됩니다)
반복문을 돌며 dt 에 맞는 Row 만 WHERE 에서 추출하고
순서대로 진행해보겠습니다.
# 테스트를 위해 2일치 (2021.10.01 ~ 2021.10.02) 만 세팅
import pandas as pd
partitions = pd.date_range(start='20211002',end='20211002',freq='D').strftime('%Y%m%d')
# dt 컬럼 제거 및 View 등록
dfListingCalendarRefined.drop("dt").createOrReplaceTempView("LISTING_CALENDAR_RAW")
# 반복문 내에서 파티션 등록 및 INSERT INTO 실행
for p in partitions:
spark.sql(f"""
ALTER TABLE airbnb_db.listing_calendar ADD IF NOT EXISTS PARTITION (
dt = '{p}'
)
LOCATION 's3://airbnb-data-lake/db/listing_calendar/dt={p}';
""")
spark.sql(f"""
INSERT OVERWRITE airbnb_db.listing_calendar
PARTITION (dt = '{p}')
SELECT /*+ REPARTITION(2) */ *
FROM LISTING_CALENDAR_RAW
WHERE date = to_date('{p}', 'yyyyMMdd')
""")
위 코드를 실행하면 파티션 생성과 2일치 파티션을 위한 데이터가 적재됩니다. 데이터를 추출 해보겠습니다.
spark.sql("""
SELECT *
FROM airbnb_db.listing_calendar
WHERE listing_id = 35886541
""").show()
S3 데이터를 살펴보면 REPARTITION 힌트가 잘 동작해 파티션당 파일이 2개임을 확인할 수 있습니다.
$ aws s3 ls --recursive s3://airbnb-data-lake/db/listing_calendar/dt=20211001/
airbnb-data-lake/db/listing_calendar/dt=20211001/part-00000-1b3d55a5-00e1-4153-858f-16c7425b7dc4.c000.snappy.parquet
airbnb-data-lake/db/listing_calendar/dt=20211001/part-00001-1b3d55a5-00e1-4153-858f-16c7425b7dc4.c000.snappy.parquet
airbnb-data-lake/db/listing_calendar/dt=20211002/part-00000-c28bb9a9-1846-4d08-9149-2c8eb58dbd67.c000.snappy.parquet
airbnb-data-lake/db/listing_calendar/dt=20211002/part-00001-c28bb9a9-1846-4d08-9149-2c8eb58dbd67.c000.snappy.parquet
만약 수동으로 파티션을 제거하고 싶다면 DROP PARTITION DDL 을 사용하면 됩니다. 여러 파티션을 삭제하고 싶을때는 다음처럼 범위 연산자를 이용할 수 있습니다.
ALTER TABLE airbnb_db.listing_calendar DROP PARTITION (dt > '20211001'), PARTITION (yyyymmdd < '20211101');
DataFrame.partitionBy() 함수로 적재시 파티션이 자동생성된 것도 볼 수 있습니다. 파티션은 어떻게 관리하면 좋을지 이야기 해봅시다.
만약 사용자가 실수로 파티션이 없는 테이블에 대해 수많은 파티션을 자동으로 생성하게 되면 Hive Meastore 에 부하가 가 단순 읽기 작업을 하는 Spark 작업도 밀릴 수 있습니다.
따라서 대량 Partition 추가는 Spark 작업과는 별개로 관리하고, 필요하다면 미리 해 놓는 편이 좋습니다.
단건 (= 1건) Partition 추가는 Spark 작업내에서 spark.sql("ADD PARTITION IF NOT EXISTS") 로 관리한다면 편리하게 운영할 수 있습니다.
혹은 spark.sql.sources.partitionOverwriteMode = 'dynamic' 를 이용할 수 있으나, partitionBy() 에 파티션 생성을 맡기는 것은 데이터의 유무로 인해 불확정적이므로 Spark Job 시작 부분에서 실행되는 ADD PARTITION IF NOT EXISTS 가 나을 수 있습니다.
Practice
실습 과제입니다.
Hive Metastore 실습
Hive Metastore 는 대부분의 데이터 시스템에서 근간으로 사용되는 서비스입니다.
최근에 자주 사용되는 실시간 조회를 지원하는 Delta Lake 용 스토리지인 Hudi, Iceberg 로 가더라도 Hive Metastore 는 여전히 필수적인 요소입니다.
이 챕터에서는 Hive Metatore 에 대한 실습을 두 가지로 진행합니다.
Spark 내장 Hive Metastore 및 Embedded Java RDB 인 Derby 를 사용해 로컬 모드에서 외부 Hive Metastore 없이 Spark 를 사용해 봅니다.
Spark 내 conf/hive-site.xml 을 세팅해 로컬 모드에서 외부 Hive Metastore 에 연결해 Spark 를 사용해 봅니다.
실습은 Practical Spark 세션에서 진행됩니다.
Summary
이번 챕터에서는 Spark SQL 에 대해 알아보았습니다. Spark SQL 은 Hive 의 SQL 문법을 포함해 많은 기능을 지원하나, 일부 기능은 지원되지 않는 경우도 있습니다.
Spark SQL 의 다른 기능들도 많으니 살펴보시기 바랍니다.
를 통해 DataFrame 을 View 로 등록해 SQL 구문에서 사용할 수 있고 (spark.catalog.DropView 를 통해 제거)
일반적인 연산을 위한 이나 은 물론 과 같은 다양한 타입과 그를 위한 함수들도 제공합니다.
필요하다면 사용자 정의 함수 () 를 등록해 SQL 구문 내에서 사용하는 것이 가능하며
DataFrame.cache() 와 같은 인프라 수준의 기능도 구문을 통해 지원합니다.
Spark SQL 을 실습해보며 기본적인 사용법을 익혀보겠습니다.
DataFrame. 함수를 통해 DataFrame 을 View 로 등록해 SQL 내에서 테이블처럼 사용할 수 있습니다.
상품 메타 (LISTING_META) 가 기준이 되는 테이블이고, 상품 이벤트 (LISTING_CALENDAR) 를 요약해서 을 수행했습니다.
다만 이 때, 상품 이벤트가 존재하지 않을 수 있으므로 (주문이 없거나 누락 등) 함수를 이용해서 NULL 값이 나오지 않도록 합니다
위의 예제와 같이 만약 0 값이 비즈니스적으로 어떤 의미를 가진다면 를 사용하지 않고 0 대신 NULL 을 사용할수도 있습니다
이제 LISTING_AVAILABILITY 를 View 로 등록하고 review_scores_rating > 0 인 경우에만 LISTING_STAT View 로 만든 뒤 구문을 이용해 캐싱해보겠습니다.
위의 예제에서는 spark.sql 을 이용해 현재 생성된 View (임시 테이블) 을 목록을 확인했습니다. 를 이용해서도 가능합니다. Catalog API 는 테이블 생성 / 조회 등 Metastore 관련된 함수들을 제공합니다.
이제까지의 예제에서는 DataFrame.createOrReplaceTempVeiw() 를 사용해서 View 를 만들었지만 구문을 이용하면 위의 과 같이 SQL 쿼리를 이용해서 View 를 생성하는것도 가능합니다.
AWS 사용시 Metastore 는 EMR 에서 Hive 를 선택해 이용하거나 아니면 를 이용할 수도 있습니다.
는 Hive Metastore 와 호환되는 관리형 메타스토어 서비스입니다.
(STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler')
(STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler')
(STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler')
Presto 와 같은 범용 쿼리 엔진은 를 통해 여러 저장소를 직접 조회할 수 있습니다.
다만 일부 제약 조건이 있을 수 있습니다 ()
GCP 를 사용한다면 Serverless Hive Metastore 인 를 이용할 수 있습니다
시작 전에 한가지 이야기 하고 넘어가야 할 것은 지금까지 함수를 통해 생성했던 (Temporary) View 는 Table 이 아닙니다.
S3 를 이용한다면 를 이용해 테이블은 삭제하고 데이터 전체를 살려둔 채 저비용 스토리지로 내려 보관처리 하거나
S3 의 데이터를 등 저비용 스토리지 클래스로 변경 및 복원시에는 일부 시간이 소요될 수 있습니다. 따라서 당장 복구해야 하는 경우가 생길 수 있다면, Glacier 보다는 높은 비용의 스토리지 클래스 사용이 권장됩니다.
테이블을 생성하기 위해 Catalog API 를 사용한다면 을 이용할 수 있습니다.
만약 DataFrame API 를 사용한다면 을 사용해 테이블을 만들 수 있습니다. (테이블이 없는 경우)
DataFrame. 는 DataFrame.saveAsTable 과는 달리 컬럼의 이름이 아니라 포지션을 이용해 데이터를 적재하므로, 사용하지 않는 편이 낫습니다.
구문과 구문을 이용해 데이터베이스와 External 테이블을 생성해보겠습니다. 아래는 몇 가지 주의사항입니다.
문서에서 Hive 가 지원하는 타입에 대해 볼 수 있습니다.
만약 Hive 테이블을 Parquet 포맷으로 Spark 로 만들되, Presto 등 다른 엔진에서 사용한다면 등 해당 엔진에서 지원하는 타입과도 호환되는지 확인이 필요합니다.
위의 예제에서는 저장 형식으로 많이 사용되는 Columnar Format 인 Parquet 를 사용했습니다. 그런데 Spark 에서 Parquet 파일을 Hive 테이블로 만들때 주의사항이 몇 가지 있습니다.
의 경우에도 spark.sql.hive.convertMetastoreParquet 옵션이 활성화 되어 있어야 합니다.
최근에는 이런 Versioning 및 Update 및 문제를 해결하기 위한 별도의 Table Format () 이나 시스템 () 가 나오고 있습니다.
도 이러한 문제를 해결하는데 도움이 될 수 있지만, Limitation 이 많고 Hive (or ) 를 통해 직접 데이터를 읽는 경우가 적습니다.
listing_meta 는 SQL 의 를 이용해서 데이터를 저장하고
만약 Overwrite 가 아니라 Append 하고 싶다면 를 사용할 수 있습니다.
이제 listing_calendar 적재를 위해 DataFrame 의 를 이용해 보겠습니다. Spark 문서에서 Generic Load / Save 함수와 Parquet 의 partition 부분을 읽어보면 사용법에 대해 쉽게 파악할 수 있습니다.
AWS EMR 과 S3 를 사용하는 경우에 S3-Optimized Commiter 로 인해 PathOutputCommitProtocol 를 선택하는 경우가 많습니다. ()
EMRFS S3-Optimized Commiter 를 이용하지 않아도 문제가 없습니다. 다만 AWS EMR / S3 를 이용할 경우에는 로 높은 쓰기 성능을 얻을 수 있습니다.