propertyTypeHouse ='House'dfListingHouseAvailable = spark.sql(f"""WITH LISTING_HOUSE AS ( SELECT id as listing_id, listing_url, name FROM LISTING_META WHERE property_type = '{propertyTypeHouse}')SELECT LISTING_CALENDAR.listing_id, listing_url, name as listing_name, date, available, priceFROM LISTING_CALENDARINNER JOIN LISTING_HOUSE ON LISTING_CALENDAR.listing_id = LISTING_HOUSE.listing_idWHERE LISTING_CALENDAR.available = 't'""")
# dfListingHouseAvailable.show()
+----------+--------------------+--------------------+----------+---------+-------+
|listing_id| listing_url| listing_name| date|available| price|
+----------+--------------------+--------------------+----------+---------+-------+
| 3138055|https://www.airbn...|Classic Home on C...|2019-12-01| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2019-12-13| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2019-12-14| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2019-12-15| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2019-12-21| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2019-12-22| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2019-12-26| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2019-12-27| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2019-12-28| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2019-12-29| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2020-01-03| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2020-01-04| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2020-01-05| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2020-01-10| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2020-01-11| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2020-01-12| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2020-01-17| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2020-01-18| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2020-01-19| t|$130.00|
| 3138055|https://www.airbn...|Classic Home on C...|2020-01-24| t|$130.00|
+----------+--------------------+--------------------+----------+---------+-------+
위에서 사용한 쿼리 예제에서는 LISTING_META (Product Meta) 와 LISTING_CALENDAR(Product Meta) 테이블을 조인했습니다.
Product Meta + Product Event (Transaction, Calendar, Activity) 조합은 앞으로 여러분들이 굉장히 많이 사용할 데이터 결합 패턴입니다.
상품의 속성은 일반적으로 별도 테이블로 존재하고 상품 이벤트에 비해 비교적 적은 Row 로 구성되어 있습니다
상품의 이벤트는 일반적으로 상품 메타에 비해 데이터 사이즈가 매우 큽니다
예를 들어, 상품은 1개여도 해당 상품에 대한 주문은 여러번 발생할 수 있습니다
도메인에 따라 상품 메타가 더 자세히 나뉘기도 합니다
커머스의 경우에는 상품 > 상품 옵션일수도 있습니다
여행의 경우에는 호텔 > 룸 > 레이트 플랜 (룸 + 옵션) > 날짜 와 같이 더 복잡한 형태로 구성되기도 합니다
위와 같이 상품 이벤트에 대해 상품 메타를 붙이는 경우도 있지만 많은 경우에 상품 메타를 축으로 잡고 집계 연산을 수행할 수 있습니다. (e.g, 통계 데이터 등)
dfListingAvailability = spark.sql("""WITH LISTING_GROUPED AS ( SELECT listing_id, count(CASE WHEN available = 't' THEN 1 END) as count_date_available, count(CASE WHEN available <> 't' THEN 1 END) as count_date_unavailable FROM LISTING_CALENDAR GROUP BY listing_id)SELECT LISTING_META.id as listing_id, LISTING_META.listing_url as listing_url, LISTING_META.name as listing_name, LISTING_META.review_scores_rating as review_scores_rating, LISTING_META.price as listing_price_basic, coalesce(LISTING_GROUPED.count_date_available, 0) as count_date_available, coalesce(LISTING_GROUPED.count_date_unavailable, 0) as count_date_unavailableFROM LISTING_METALEFT JOIN LISTING_GROUPED ON LISTING_META.id = LISTING_GROUPED.listing_id""")
상품 메타 (LISTING_META) 가 기준이 되는 테이블이고, 상품 이벤트 (LISTING_CALENDAR) 를 요약해서 LEFT JOIN 을 수행했습니다.
다만 이 때, 상품 이벤트가 존재하지 않을 수 있으므로 (주문이 없거나 누락 등) coalesce 함수를 이용해서 NULL 값이 나오지 않도록 합니다
위의 예제와 같이 만약 0 값이 비즈니스적으로 어떤 의미를 가진다면 coalesce 를 사용하지 않고 0 대신 NULL 을 사용할수도 있습니다
LISTING_CALENDAR 는 listing_id 당 중복되는 'date' 가 없다고 가정합니다. 만약 중복이 있다면 distinct 를 사용하거나 중복이 없는 상품 메타 단위로 더 세분화 해 집계할 수 있습니다
통계 등을 위한 조인 및 집계 관련해서는 추후에 다른 챕터에서 더 설명하도록 하겠습니다. 아래 그림을 통해 JOIN 종류에 따른 결과를 시각적으로 볼 수 있습니다.
이제 LISTING_AVAILABILITY 를 View 로 등록하고 review_scores_rating > 0 인 경우에만 LISTING_STAT View 로 만든 뒤 CACHE TABLE 구문을 이용해 캐싱해보겠습니다.
spark.sql("""CACHE TABLE LISTING_STAT AS ( SELECT * FROM LISTING_AVAILABILITY WHERE review_scores_rating IS NOT NULL)""")spark.sql("SHOW TABLES").show()
이제까지의 예제에서는 DataFrame.createOrReplaceTempVeiw() 를 사용해서 View 를 만들었지만 CREATE VIEW 구문을 이용하면 위의 CACHE TABLE AS (...) 과 같이 SQL 쿼리를 이용해서 View 를 생성하는것도 가능합니다.
2021년 10월 16일의 해당 테이블 데이터는 s3://udon-data-lake/db/udon_topping_sales/2021/10/16" 위치에 있고
미래에 그 위치가 변해도 "내가 알려줄거야" 라고 말한다면 어떨까요?
그 시스템이 바로 메타스토어 입니다. (메타 정보를 저장하고 있는 시스템)
Hive Metastore 는 Spark 와 자주 같이 쓰이곤 합니다. 아래 스크린샷은 Spark 에서 Hive Metastore 를 통해 RDS (MySQL 등) 에 있는 Table 및 Partition 의 정보를 읽어 S3 의 위치를 찾아 낸 뒤 Spark 를 통해 읽는 경우를 보여줍니다. 즉, Metastore 는 테이블의 실제 데이터인 '파일' 이 어디에 있는지만 저장하지 파일을 저장하진 않습니다.
아래 그림에서는 On-Prem 에서 Hadoop / Hive Metastore 에 대응되는 AWS 의 서비스를 보여줍니다.
Hadoop Cluster 는 Computing (Yarn) + Storage (HDFS) 으로 구성되는데
AWS 사용시 Storage (HDFS) 는 S3 로 대체하거나 EMR 내의 Core 노드로 HDFS 를 운영이 가능합니다
AWS 사용시 Computing 은 EMR 의 Core (AM) + Task 노드로 대체가 가능합니다
AWS 사용시 Metastore 는 EMR 에서 Hive 를 선택해 이용하거나 아니면 AWS Glue Catalog 를 이용할 수도 있습니다.
Metastore 서비스에 Database, Table, Partition 에 대한 정보를 넣게 되면 수 많은 시스템에서 데이터를 조회하는데 사용할 수 있습니다. 아래 그림은 Hive Metastore 를 통해 Presto, Spark SQL 등에서 S3 는 물론 다양한 Storage 로 접근하는 사례를 보여줍니다.
대부분의 기업들은 Spark 이외에도 Presto 와 같은 범용 쿼리 엔진을 운영합니다. 모든 사용자가 Spark 를 사용할 수 있는것은 아니기 때문입니다. 따라서 Hive Metastore 도 같이 운용되고 있습니다.
Hive Metastore 부분을 조금 더 살펴보면, 위 스크린샷과 같이 크게 3가지 운영 형태로 나눌 수 있습니다
Spark 는 아무런 설정을 하지 않을 경우 Embedded Metastore 로 동작합니다.
노트북 등에서 로컬 환경 테스팅용으로 사용할 수 있습니다.
만약 외부에 HMS 가 없고, Spark 에서 Embded Metastore 모드를 이용하되 RDB 만 외부 것을 이용하고 싶다면 Local Metastore 모드로 사용할 수 있습니다
공유되는 HMS 가 없어 운영 등 노동 투입의 여지가 적으나, Production 에는 권장되지 않습니다.
EMR 등에 이미 떠 있는 HMS (Hive Metastore Service) 값을 spark/conf/hive-site.xml 내에 hive.metastore.uris 프로퍼티로 세팅하면 Remote Metastore 모드로 동작합니다.
대부분의 Production 환경에서는 Remote Metastore 형태로 사용합니다. HMS 를 별도로 운영하고, Spark Client 들이 접속해 사용합니다.
# spark/conf/hive-site.xml
# Remote Hive Metastore 연결을 위한 Spark 설정
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://hive-metastore-prod.udon.io:9083</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
</configuration>
# spark/conf/hive-site.xml
# Local Hive Metastore 사용을 위한 외부 RDB 연결 설정
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://localhost/metastore_db?createDatabaseIfNotExist=true</value>
<description>metadata is stored in a MySQL server</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>MySQL JDBC driver class</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>USER</value>
<description>user name for connecting to mysql server </description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>PASSWORD</value>
<description>password for connecting to mysql server </description>
</property>
</configuration>
만약 Hive Metastore 대신 Glue Catalog 를 Spark 의 Metastore 로 사용한다면, 다음 문서의 가이드를 따라 각종 컴퓨팅 프레임워크의 메타스토어로 Glue 를 설정할 수 있습니다.
테이블을 생성하기 위해 Catalog API 를 사용한다면 createTable() 을 이용할 수 있습니다.
만약 DataFrame API 를 사용한다면 saveAsTable() 을 사용해 테이블을 만들 수 있습니다. (테이블이 없는 경우)
saveAsTable() 은 mode = Overwrite 의 경우에는 현재 스키마를 무시합니다.
Catalog.createTable(), DataFrame.saveAsTable() 모두 파일의 경로 (LOCATION) 가 없을때는 Managed 테이블이 생성됩니다 (spark.sql.warehouse.dir)
DataFrame.insertInto() 는 DataFrame.saveAsTable 과는 달리 컬럼의 이름이 아니라 포지션을 이용해 데이터를 적재하므로, 사용하지 않는 편이 낫습니다.
Data Source 를 이용한 테이블 생성은 Spark 2.1 미만에서는 Hive 테이블 생성과는 다르게 동작할 수 있습니다.
From Spark 2.1, Datasource tables now store partition metadata in the Hive metastore. This means that Hive DDLs such as ALTER TABLE PARTITION ... SET LOCATION are now available for tables created with the Datasource API.
또한 Spark 2.0+ 부터는 에서 LOCATION 을 지정해 테이블을 생성하는 모든 경우에는 EXTERNAL 테이블을 생성합니다. Spark 에서는 LOCATION 을 지정해서 Managed Table 을 생성할 수 없습니다.
From Spark 2.0, CREATE TABLE ... LOCATION is equivalent to CREATE EXTERNAL TABLE ... LOCATION in order to prevent accidental dropping the existing data in the user-provided locations. That means, a Hive table created in Spark SQL with the user-specified location is always a Hive external table. Dropping external tables will not remove the data. Users are not allowed to specify the location for Hive managed tables. Note that this is different from the Hive behavior.
만약 CREATE TABLE (HIVE FORMAT) 이용해서 생성시 테이블에 TBLPROPERTIES ('spark.sql.sources.provider' = 'PARQUET') 들어가지 않아, S3 Path 오류가 발생할 수 있습니다.
(Spark 에서는 spark.sql 로 시작하는 TBLPROPERTIES 를 사용자가 직접 세팅 불가)
위에서서 살펴 보았듯이 테이블을 생성하기 위한 다양한 방법이 존재합니다. 또한 API 마다의 특징과 Spark, Hadoop 버전에 따른 호환성 및 각종 이슈가 존재하므로 이 섹션에서는 범용적인 방법에 대해서 다룹니다.
어떤 방법을 이용해 Hive Table 을 생성하는 편이 나을까요?
Catalog API 나 DataFrame API 를 이용할 수도 있겠지만, Spark 를 이용해 만든 Hive Metastore 의 테이블이 Presto 등 다양한 곳에서 읽힐 수 있다는 점을 고려해보면,
Spark DataFrame 에서 자동 변환되는 타입이 아니라 사용자가 정의한 타입이 명시된 Hive DDL 을 실행하는 편이 더 나을 수 있습니다
테이블은 Spark Batch Job 이 아니라 외부에서 DDL 등을 이용해 직접 생성하고, Batch Job 은 데이터만 적재하는 경우도 많습니다
만약 문제 발생시 DDL 을 다시 실행해야 할때, 데이터 적재가 포함된 Spark Batch Job (Application) 을 별도로 실행해야 한다는건 부담스럽습니다.DDL 을 별도로 관리하고 있다면, Jupyter 에서 Spark SQL 로 혹은 Hive CLI 에서 DDL 을 바로 실행할 수 있습니다.
dfListingAvailabilityFinal\.repartition(2)\.write\.format("parquet")\.mode("overwrite")\.save("s3://airbnb-data-lake/db/listing_availability")# Spark Session 시작 후 스키마가 변경될 경우 REFRESH TABLE 을 이용해 테이블 정보를 다시 받아올 수 있습니다# spark.catalog.refreshTable("airbnb_db.listing_availability") 와 동일spark.sql("REFRESH TABLE airbnb_db.listing_availability")spark.sql(""" SELECT * FROM airbnb_db.listing_availability""").show()
> Partitioned tables can be created using the PARTITIONED BY clause. A table can have one or more partition columns and a separate data directory is created for each distinct value combination in the partition columns. Further, tables or partitions can be bucketed using CLUSTERED BY columns, and data can be sorted within that bucket via SORT BY columns. This can improve performance on certain kinds of queries.
아래와 같이 udon_topping_sales 라는 일별로 주문된 내역을 저장하고 있는 테이블이 있다고 가정해봅시다.
만약 파티션 컬럼 (dt) 없이 일반 컬럼 created_dt (Date 타입) 으로만 데이터를 탐색해야 한다면, "전체" 데이터를 다 읽으면서 created_dt 값을 이용해 필터링을 진행합니다.
반면 파티션 컬럼 (dt) 를 사용한다면, 디렉토리 (또는 Object Path in case of S3) 내의 데이터를 전혀 읽지 않아도 되므로 속도가 훨씬 빠릅니다.
SELECT*FROM udon_db.udon_topping_sales WHERE dt BETWEEN '20211001' AND '20211031' -- Partition 필터링 (데이터를 읽지 않고 스킵)
AND (created_dt >=DATE('20211001') AND created_dt <=DATE('20211031')) -- Row 필터링 (데이터를 읽어서 판별)
파티션이 없는 테이블은 데이터 적재 관점에서도 어려움을 만듭니다.
만약 스냅샷 데이터 (e.g., 누적된 전체 주문건) 을 매일매일 적재해야 한다면 새로운 S3 경로에 쌓고 DDL 을 통해 Table 의 LOCATION 을 변경해야 할까요?
과거 시점의 스냅샷 데이터를 읽고싶다면 어떻게 해야할까요?
만약 Delta 데이터 (e.g., 당일 생성 주문건) 을 매일 추가로 만들어서 같은 테이블 내에서 쓰게 해야한다면 Append 해야할까요?
File 이 temp 에 써진 후 작업 종료 이후에 대상 S3 버켓으로 옮겨지는 순간에는 데이터를 읽을 수 있을까요?
최근에는 이런 Versioning 및 Update 및 문제를 해결하기 위한 별도의 Table Format (Apache Iceberg) 이나 시스템 (Apache Hudi) 가 나오고 있습니다.
만약 Overwrite 가 아니라 Append 하고 싶다면 INSERT INTO 를 사용할 수 있습니다.
listing_calendar 는 DataFrame 의 partitionBy() 와 saveAsTable 을 이용해서 데이터를 저장해 보겠습니다.
partitionListingMeta ='20211101'spark.sql(f"""INSERT OVERWRITE airbnb_db.listing_meta PARTITION (dt = '{partitionListingMeta}')SELECT CAST(id AS BIGINT) as listing_id, name as listing_name, listing_url, property_type, country, state, city, descriptionFROM LISTING_META -- 이전 섹션에서 만든 Temporary View 입니다""")
이제 만든 테이블 내의 데이터를 SELECT 해 DataFrame 으로 만들고 실행 계획과 데이터를 확인해보겠습니다. PartitionFilter 를 통해 지정된 Location 의 데이터만 탐색했음을 알 수 있습니다.
dfTableListingMeta = spark.sql(""" SELECT * FROM airbnb_db.listing_meta WHERE dt = '20211101'""")dfTableListingMeta.explain("FORMATTED")dfTableListingMeta.show()
이제 listing_calendar 적재를 위해 DataFrame 의 partitionBy() 를 이용해 보겠습니다. Spark 문서에서 Generic Load / Save 함수와 Parquet 의 partition 부분을 읽어보면 사용법에 대해 쉽게 파악할 수 있습니다.
spark.sql.sources.partitionOverwriteMode 옵션은 Spark 2.3 에 추가되었습니다.
> When INSERT OVERWRITE a partitioned data source table, we currently support 2 modes: static and dynamic. In static mode, Spark deletes all the partitions that match the partition specification(e.g. PARTITION(a=1,b)) in the INSERT statement, before overwriting. In dynamic mode, Spark doesn't delete partitions ahead, and only overwrite those partitions that have data written into it at runtime. By default we use static mode to keep the same behavior of Spark prior to 2.3. Note that this config doesn't affect Hive serde tables, as they are always overwritten with dynamic mode. This can also be set as an output option for a data source using key partitionOverwriteMode (which takes precedence over this setting), e.g. dataframe.write.option("partitionOverwriteMode", "dynamic").save(path).
spark.sql.sources.partitionOverwriteMode = dynamic 으로 전체 파티션이 삭제되는 것을 막을 수 있습니다. 있으나 만약 PathOutputCommitProtocol 를 사용한다면 Hadoop 3.1+ 을 쓰는 경우에는 spark.sql.sources.partitionOverwriteMode = dynamic 이 사용 불가능합니다.
AWS EMR 과 S3 를 사용하는 경우에 S3-Optimized Commiter 로 인해 PathOutputCommitProtocol 를 선택하는 경우가 많습니다. (Spark Cloud Integration Guide)
그리고 AWS EMRFS S3-Optimized Commiter 를 이용할 경우에는 spark.sql.sources.partitionOverwriteMode = static 여야 합니다 (dynamic 사용 불가)
EMRFS S3-Optimized Commiter 를 이용하지 않아도 문제가 없습니다. 다만 AWS EMR / S3 를 이용할 경우에는 S3-Optimized Commiter 로 높은 쓰기 성능을 얻을 수 있습니다.
Improves application performance by avoiding list and rename operations done in Amazon S3 during job and task commit phases.
Avoids issues that can occur with Amazon S3 eventual consistency during job and task commit phases, and helps improve job correctness under task failure conditions.
이런 이유로 인해 일부 환경에서 (Hadoop 3.1+, PathOutputCommitProtocol) DataFrame.partitionBy().saveAsTable() 사용이 전체 파티션 삭제를 유발하므로, INSERT OVERWRITE INTO 로 변경해보겠습니다.
ALTER TABLE ADD PARTITION DDL 을 통해 파티션을 미리 생성합니다 (미리 만들지 않아도 INSERT INTO 실행시 자동 생성됩니다. 이 섹션에서는 실습을 위해 직접 만듭니다)
INSERT OVERWRITE INTO 로 넣을 경우 Static 파티션 을 이용하므로 dt 컬럼이 필요 없습니다. drop() 함수를 통해 제거합니다
INSERT OVERWITE INTO 를 위해 현재까지 가공한 DataFrame 을 createOrReplaceView() 함수를 사용해 View 로 만듭니다
INSERT OVERWRITE INTO 를 실행합니다
Pandas 라이브러리를 이용해 반복문을 위한 파티션 값 (dt) 를 생성합니다. (Pandas 를 이용하지 않아도 됩니다)
# 테스트를 위해 2일치 (2021.10.01 ~ 2021.10.02) 만 세팅import pandas as pdpartitions = pd.date_range(start='20211002',end='20211002',freq='D').strftime('%Y%m%d')# dt 컬럼 제거 및 View 등록dfListingCalendarRefined.drop("dt").createOrReplaceTempView("LISTING_CALENDAR_RAW")# 반복문 내에서 파티션 등록 및 INSERT INTO 실행for p in partitions: spark.sql(f""" ALTER TABLE airbnb_db.listing_calendar ADD IF NOT EXISTS PARTITION ( dt = '{p}' ) LOCATION 's3://airbnb-data-lake/db/listing_calendar/dt={p}'; """) spark.sql(f""" INSERT OVERWRITE airbnb_db.listing_calendar PARTITION (dt = '{p}') SELECT /*+ REPARTITION(2) */ * FROM LISTING_CALENDAR_RAW WHERE date = to_date('{p}', 'yyyyMMdd') """)
위 코드를 실행하면 파티션 생성과 2일치 파티션을 위한 데이터가 적재됩니다. 데이터를 추출 해보겠습니다.
spark.sql(""" SELECT * FROM airbnb_db.listing_calendar WHERE listing_id = 35886541""").show()
DataFrame.partitionBy() 함수로 적재시 파티션이 자동생성된 것도 볼 수 있습니다. 파티션은 어떻게 관리하면 좋을지 이야기 해봅시다.
만약 사용자가 실수로 파티션이 없는 테이블에 대해 수많은 파티션을 자동으로 생성하게 되면 Hive Meastore 에 부하가 가 단순 읽기 작업을 하는 Spark 작업도 밀릴 수 있습니다.
따라서 대량 Partition 추가는 Spark 작업과는 별개로 관리하고, 필요하다면 미리 해 놓는 편이 좋습니다.
단건 (= 1건) Partition 추가는 Spark 작업내에서 spark.sql("ADD PARTITION IF NOT EXISTS") 로 관리한다면 편리하게 운영할 수 있습니다.
혹은 spark.sql.sources.partitionOverwriteMode = 'dynamic' 를 이용할 수 있으나, partitionBy() 에 파티션 생성을 맡기는 것은 데이터의 유무로 인해 불확정적이므로 Spark Job 시작 부분에서 실행되는 ADD PARTITION IF NOT EXISTS 가 나을 수 있습니다.