AWS

AWS DMS로 가져온 Transaction 정보를 이용해 databricks delta table 에서 merge into 하기

whistory 2023. 3. 6. 10:25
반응형

 

 

 

 

 

AWS DMS(Data Migration Service)를 사용하여 Aurora PostgreSQL 를 AWS S3에 날짜 기준으로 파티셔닝 하여 적재

AWS DMS 를 사용해 Aurora PostgreSQL → AWS S3로 파티셔닝 해 적재해본다. 1. 복제 인스턴스 생성 VPC와 subnetgroup을 잘 선택하여 생성한다. 생성 시 ‘퍼블릭 액세스 가능’ 가 선택되어야 S3 접근이 가능

whiseung.tistory.com

 

 

databricks 에서 Storage Credential 과 External Location 등록해 AWS S3를 Table로 생성하기.

Databricks 에서 S3에 파일을 가져와 Table을 생성해본다. 1. AWS 역할(role) 생성 1.1 역할 생성을 위해, managed(account) console 에서 Account ID를 확인한다. 1.2 AWS 콘솔에서 IAM으로 이동. [역할] - [역할 만들기]

whiseung.tistory.com

 

 

💡 AWS DMS를 통해 Aurora PostgreSQL 에서 AWS S3로 가져온 transaction 정보를
     Databricks Delta Table에서 반영한다.

 

 

1. CDC 된 parquet 파일을 dataframe 으로 불러옴

import datetime 
now_date = datetime.datetime.now()
exec_date = datetime.datetime.strftime(now_date, '%Y/%m/%d')

cdc_s3_full_path = f"s3://bucket_name/postgresql_transaction/schema_name/table_name/{exec_date}"
df = spark.read.format("parquet").option("header", "false").load(cdc_s3_full_path)

from pyspark.sql.functions import col
sortDF = df.sort(col("transact_seq").asc())
sortDF.show()

 

 

 

2. pk 별로 partition by 진행, pk의 max transaction_seq 구하기

import sys
from pyspark.sql.window import Window
from pyspark.sql.functions import max

windowSpec = Window.partitionBy(df.col01).orderBy(df.transact_seq).rangeBetween(-sys.maxsize, sys.maxsize)
maxSeqDF = df.withColumn("max_seq", max(col("transact_seq")).over(windowSpec))
maxSeqDF.show()

 

3. merge 할 최종 dataframe 생성

insertDF = maxSeqDF.filter("transact_seq=max_seq").filter("Op='I'")
updateDF = maxSeqDF.filter("transact_seq=max_seq").filter("Op IN ('U', 'D')")
finalDF = insertDF.unionAll(updateDF)
finalDF.show()

 

4. dataframe을 테이블로 만들기

finalDF.createOrReplaceTempView("cdcData")
spark.sql("select * from cdcData").show()

 

5. merge into 문 수행

5.1 sql

%sql
MERGE INTO `catalog`.default.test_tbl_003 as T
USING (SELECT Op, col01, col02, col03, col04, col05 FROM cdcData) s
ON t.col01 = s.col01
WHEN MATCHED AND s.Op = 'D' THEN DELETE
WHEN MATCHED AND s.OP = 'U' THEN UPDATE SET t.col02 = s.col02, t.col03 = s.col03, t.col04 = s.col04, t.col05 = s.col05
WHEN NOT MATCHED AND s.OP = 'I' THEN INSERT ( col01, col02, col03, col04, col05) VALUES ( s.col01, s.col02, s.col03, s.col04, s.col05)

5.2 spark

spark.sql("MERGE INTO `catalog`.default.test_tbl_003 as T \\
            USING (SELECT Op, col01, col02, col03, col04, col05 FROM cdcData) s \\
            ON t.col01 = s.col01 \\
            WHEN MATCHED AND s.Op = 'D' THEN DELETE \\
            WHEN MATCHED AND s.OP = 'U' THEN UPDATE SET t.col02 = s.col02, t.col03 = s.col03, t.col04 = s.col04, t.col05 = s.col05 \\
            WHEN NOT MATCHED AND s.OP = 'I' THEN INSERT ( col01, col02, col03, col04, col05) VALUES ( s.col01, s.col02, s.col03, s.col04, s.col05) \\
           ").show()

 

6. 확인

spark.sql("SELECT * FROM `ssts-test`.default.test_tbl_003 ORDER BY cast(col01 as int) DESC").show()

반응형