반응형
💡 AWS DMS를 통해 Aurora PostgreSQL 에서 AWS S3로 가져온 transaction 정보를
Databricks Delta Table에서 반영한다.
1. CDC 된 parquet 파일을 dataframe 으로 불러옴
import datetime
now_date = datetime.datetime.now()
exec_date = datetime.datetime.strftime(now_date, '%Y/%m/%d')
cdc_s3_full_path = f"s3://bucket_name/postgresql_transaction/schema_name/table_name/{exec_date}"
df = spark.read.format("parquet").option("header", "false").load(cdc_s3_full_path)
from pyspark.sql.functions import col
sortDF = df.sort(col("transact_seq").asc())
sortDF.show()
2. pk 별로 partition by 진행, pk의 max transaction_seq 구하기
import sys
from pyspark.sql.window import Window
from pyspark.sql.functions import max
windowSpec = Window.partitionBy(df.col01).orderBy(df.transact_seq).rangeBetween(-sys.maxsize, sys.maxsize)
maxSeqDF = df.withColumn("max_seq", max(col("transact_seq")).over(windowSpec))
maxSeqDF.show()
3. merge 할 최종 dataframe 생성
insertDF = maxSeqDF.filter("transact_seq=max_seq").filter("Op='I'")
updateDF = maxSeqDF.filter("transact_seq=max_seq").filter("Op IN ('U', 'D')")
finalDF = insertDF.unionAll(updateDF)
finalDF.show()
4. dataframe을 테이블로 만들기
finalDF.createOrReplaceTempView("cdcData")
spark.sql("select * from cdcData").show()
5. merge into 문 수행
5.1 sql
%sql
MERGE INTO `catalog`.default.test_tbl_003 as T
USING (SELECT Op, col01, col02, col03, col04, col05 FROM cdcData) s
ON t.col01 = s.col01
WHEN MATCHED AND s.Op = 'D' THEN DELETE
WHEN MATCHED AND s.OP = 'U' THEN UPDATE SET t.col02 = s.col02, t.col03 = s.col03, t.col04 = s.col04, t.col05 = s.col05
WHEN NOT MATCHED AND s.OP = 'I' THEN INSERT ( col01, col02, col03, col04, col05) VALUES ( s.col01, s.col02, s.col03, s.col04, s.col05)
5.2 spark
spark.sql("MERGE INTO `catalog`.default.test_tbl_003 as T \\
USING (SELECT Op, col01, col02, col03, col04, col05 FROM cdcData) s \\
ON t.col01 = s.col01 \\
WHEN MATCHED AND s.Op = 'D' THEN DELETE \\
WHEN MATCHED AND s.OP = 'U' THEN UPDATE SET t.col02 = s.col02, t.col03 = s.col03, t.col04 = s.col04, t.col05 = s.col05 \\
WHEN NOT MATCHED AND s.OP = 'I' THEN INSERT ( col01, col02, col03, col04, col05) VALUES ( s.col01, s.col02, s.col03, s.col04, s.col05) \\
").show()
6. 확인
spark.sql("SELECT * FROM `ssts-test`.default.test_tbl_003 ORDER BY cast(col01 as int) DESC").show()
반응형
'AWS' 카테고리의 다른 글
databricks 에서 Storage Credential 과 External Location 등록해 AWS S3를 Table로 생성하기. (0) | 2023.02.27 |
---|---|
AWS DMS(Data Migration Service)를 사용하여 Aurora PostgreSQL 를 AWS S3에 날짜 기준으로 파티셔닝 하여 적재 (0) | 2023.02.23 |
AWS Glue로 S3 에 저장된 parquet 파일 읽어보기 (0) | 2023.01.05 |
AWS Glue에서 ngdbc 를 이용해 SAP HANA CLOUD 데이터를 S3에 저장하기 (0) | 2023.01.04 |
AWS Glue에서 hdbcli 를 이용해 SAP BW 데이터를 S3에 저장하기 (0) | 2023.01.04 |