반응형
Data Fusion 으로 정상적인 데이터를 가져오는데 실패했다.
납기일은 맞춰야하고,
SAP OData는 URL 호출이기 때문에 급한대로 Cloud Function으로 개발을 시작한다.
pyodata 를 이용해 데이터를 가져오고,
df 에 데이터를 담아
bigquery 에 적재하는 방식이다.
service account json 파일이 필요하고.
schema 정보를 가지고 있는 json 작성이 필요하다.
main.py
from google.cloud import bigquery
import os
import requests
import pyodata
import pandas as pd
import google.auth
import google.auth.transport.requests
from google.oauth2 import service_account
import datetime
import json
import logging
import googleapiclient
from googleapiclient.discovery import build
def exec_odata(request):
now = datetime.datetime.now()
logging.info("start! {}".format(now))
auth_req = google.auth.transport.requests.Request()
credentials = service_account.Credentials.from_service_account_file(filename='service_account.json',scopes=['https://www.googleapis.com/auth/cloud-platform'])
service = googleapiclient.discovery.build('iam', 'v1', credentials=credentials)
credentials.refresh(auth_req)
token = credentials.token
with open("entity.json", 'r') as json_file:
info_json = json.load(json_file)
prod_sap_url = 'https://odata.sap.com:8443/sap/opu/odata/sap/ODATA'
for i in range(len(info_json.keys())):
entitiy_svc = list(info_json.keys())[i]
SERVICE_URL = prod_sap_url + entitiy_svc
session = requests.Session()
session.auth = ('ID', 'PASSWORD')
oclient = pyodata.Client(SERVICE_URL, session)
info_json2 = info_json[entitiy_svc]
entitiy2 = info_json2['entity_2']
cols = info_json2['col']
schema = info_json2['bg_schema']
bqtable = info_json2['bq_table']
for i in range(len(cols)):
exec("list%s = []" % (cols[i]))
df = pd.DataFrame({})
print("start to data on mem for {}".format(entitiy_svc))
for i in range(len(cols)):
exec("list_%s = []" % (cols[i]))
executestr = ''
for i in range(len(cols)):
executestr += "\n\tlist_{}.append(entities.{})".format(cols[i], cols[i])
fullstr = "for entities in oclient.entity_sets.{}.get_entities().execute():\n\t".format(entitiy2) + executestr
exec(fullstr)
for i in range(len(cols)):
exec("df[cols[i]] = list_%s" % (cols[i]))
print(df)
print("save to bigquery for {}".format(entitiy_svc))
project_id = 'ecorbit-dev'
df.to_gbq(bqtable,project_id=project_id,if_exists='replace', table_schema=schema)
now2 = datetime.datetime.now()
print("end of {}! It took {}".format(entitiy_svc, now2-now))
return "all of pipeline is done!"
requirements.txt
# Function dependencies, for example:
# package>=version
#google==3.0.0
#google-api-core==2.8.1
#google-api-python-client==2.49.0
#google-auth==2.3.1
#google-auth-httplib2==0.1.0
#google-auth-oauthlib==0.4.6
#google-cloud==0.34.0
#google-cloud-bigquery==3.2.0
#google-cloud-bigquery-storage==2.14.1
#google-cloud-core==2.3.0
#google-cloud-datastore==2.6.0
#google-cloud-storage==2.3.0
#google-crc32c==1.3.0
#logging
requests
pyodata
datetime
pandas
google-auth
google-cloud-bigquery
google-api-python-client
pandas-gbq
entitiy.json
{
"ZCV_USER_VIEW_CDS" : {
"col" : ["bukrs"
,"pernr"
,"sname"
,"dept_txt"
,"pos_text"
,"job_text"],
"entity_2" : "zcv_user_view",
"bg_schema" : [
{"name":"bukrs","type":"STRING"}
,{"name":"pernr","type":"STRING"}
,{"name":"sname","type":"STRING"}
,{"name":"dept_txt","type":"STRING"}
,{"name":"pos_text","type":"STRING"}
,{"name":"job_text","type":"STRING"}
],
"bq_table" : "project.dataset.ZCV_USER_VIEW"
},
"ZCV_BPARTNER_VIEW_CDS" : {
"col" : ["bpcode"
,"bu_group"
,"name1"
,"vkorg"
,"taxnum"],
"entity_2" : "ZCV_BPARTNER_VIEW",
"bg_schema" : [
{"name":"bpcode","type":"STRING"}
,{"name":"bu_group","type":"STRING"}
,{"name":"name1","type":"STRING"}
,{"name":"vkorg","type":"STRING"}
,{"name":"taxnum","type":"STRING"}
],
"bq_table" : "project.dataset.ZCV_BPARTNER_VIEW"
}
}
service_account.json
{
"type": "service_account",
"project_id": "projectid",
"private_key_id": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"private_key": "-----BEGIN PRIVATE KEY-----\nxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n-----END PRIVATE KEY-----\n",
"client_email": "datafusion-api-dev@projectid.iam.gserviceaccount.com",
"client_id": "xxxxxxxxxxxxxxxxxxx",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/datafusion-api-dev%40ecorbit-dev.iam.gserviceaccount.com"
}
개발 완료 후,
Cloud Scheduler 에 스케쥴을 등록하면 된다.
결과적으로 개발공수를 통해
시간적인 비용과 리소스 비용을 모두 아낄수 있었다.
Cloud Data Fusion 은
1개의 job을 실행하기 위해 Dataproc을 생성해 프로비져닝 하는데 6분 정도의 시간이 소요되고, 비용도 추가되었다.
하지만 Cloud Fuction을 통해
프로비져닝 시간을 줄이고, Dataproc 인스턴스 비용까지 절약 할 수 있게되엇다.
(강제로) Data Fusion Studio 의 UI를 포기하고 얻은 성과인듯 싶다.
반응형
'GCP' 카테고리의 다른 글
JAVA 프로젝트로 GCP BigQuery의 Bulk INSERT(Streaming buffer) 테이블 truncate 이슈 (0) | 2022.11.14 |
---|---|
JAVA 프로젝트로 GCP BigQuery의 Bulk INSERT(Streaming buffer) (0) | 2022.11.07 |
JAVA 프로젝트로 GCP BigQuery의 데이터 조작하기(Insert / Update) (0) | 2022.11.04 |
JAVA 프로젝트로 GCP BigQuery의 데이터 조회하기 (0) | 2022.10.31 |
Cloud Data Fusion(CDF)으로 SAP OData 를 BigQuery에 저장할 때 벌어진 이슈 (0) | 2022.10.14 |