GCP

Cloud Function으로 SAP OData 를 BigQuery에 적재하기

whistory 2022. 10. 17. 13:39
반응형

 

 

https://whiseung.tistory.com/m/entry/Cloud-Data-FusionCDF%EB%A1%9C-SAP-OData-%EB%A5%BC-BigQuery%EC%97%90-%EC%A0%80%EC%9E%A5%ED%95%A0%EB%95%8C-%EB%B2%8C%EC%96%B4%EC%A7%80%EB%8A%94-%EC%9D%B4%EC%8A%88

 

Cloud Data Fusion(CDF)으로 SAP OData 를 BigQuery에 저장할 때 벌어진 이슈

SAP 의 데이터를 OData를 이용해 BIgQuery에 저장하려고 한다. GCP 의 Data Fusion을 이용해 데이터를 적재한다. 데이터가 적은 경우는 괜찮았지만, 1만건 이상의 데이터를 조회할 때마다 데이터가 다르게

whiseung.tistory.com

 

 

Data Fusion 으로 정상적인 데이터를 가져오는데 실패했다.

 

납기일은 맞춰야하고, 

SAP OData는 URL 호출이기 때문에 급한대로  Cloud Function으로 개발을 시작한다.

 

 

pyodata 를 이용해 데이터를 가져오고,

df 에 데이터를 담아

bigquery 에 적재하는 방식이다.

 

service account json 파일이 필요하고.

schema 정보를 가지고 있는 json 작성이 필요하다.

 

main.py

from google.cloud import bigquery
import os
import requests
import pyodata
import pandas as pd
import google.auth
import google.auth.transport.requests
from google.oauth2 import service_account
import datetime
import json 
import logging
import googleapiclient
from googleapiclient.discovery import build

def exec_odata(request):
    now = datetime.datetime.now()
    logging.info("start! {}".format(now))
    
    auth_req = google.auth.transport.requests.Request()
    credentials = service_account.Credentials.from_service_account_file(filename='service_account.json',scopes=['https://www.googleapis.com/auth/cloud-platform'])

    service = googleapiclient.discovery.build('iam', 'v1', credentials=credentials)
    credentials.refresh(auth_req)
    token = credentials.token
    
    with open("entity.json", 'r') as json_file:
	    info_json = json.load(json_file)

    
    prod_sap_url = 'https://odata.sap.com:8443/sap/opu/odata/sap/ODATA'

    for i in range(len(info_json.keys())):
        entitiy_svc = list(info_json.keys())[i]
        
        SERVICE_URL = prod_sap_url + entitiy_svc
        session = requests.Session()
        session.auth = ('ID', 'PASSWORD')

        oclient = pyodata.Client(SERVICE_URL, session)
    
        info_json2 = info_json[entitiy_svc]

        entitiy2 = info_json2['entity_2']
        cols = info_json2['col']
        schema = info_json2['bg_schema']
        bqtable = info_json2['bq_table']

        for i in range(len(cols)):
            exec("list%s = []" % (cols[i]))

        df = pd.DataFrame({})
        print("start to data on mem for {}".format(entitiy_svc))

        for i in range(len(cols)):
            exec("list_%s = []" % (cols[i]))
    

        executestr = '' 
        for i in range(len(cols)):
	        executestr += "\n\tlist_{}.append(entities.{})".format(cols[i], cols[i])

        fullstr = "for entities in oclient.entity_sets.{}.get_entities().execute():\n\t".format(entitiy2) + executestr
        exec(fullstr)

        for i in range(len(cols)):
            exec("df[cols[i]] = list_%s" % (cols[i]))

        print(df)

        print("save to bigquery for {}".format(entitiy_svc))
        project_id = 'ecorbit-dev'
        df.to_gbq(bqtable,project_id=project_id,if_exists='replace', table_schema=schema)

        now2 = datetime.datetime.now()
        print("end of {}! It took {}".format(entitiy_svc, now2-now))

    return "all of pipeline is done!"

 

requirements.txt

# Function dependencies, for example:
# package>=version

#google==3.0.0
#google-api-core==2.8.1
#google-api-python-client==2.49.0
#google-auth==2.3.1
#google-auth-httplib2==0.1.0
#google-auth-oauthlib==0.4.6
#google-cloud==0.34.0
#google-cloud-bigquery==3.2.0
#google-cloud-bigquery-storage==2.14.1
#google-cloud-core==2.3.0
#google-cloud-datastore==2.6.0
#google-cloud-storage==2.3.0
#google-crc32c==1.3.0
#logging
requests
pyodata
datetime
pandas
google-auth
google-cloud-bigquery
google-api-python-client
pandas-gbq

 

entitiy.json

{
    "ZCV_USER_VIEW_CDS" : {
        "col" : ["bukrs"
				,"pernr"
				,"sname"
				,"dept_txt"
				,"pos_text"
				,"job_text"],
        "entity_2" : "zcv_user_view",
        "bg_schema" : [
			 {"name":"bukrs","type":"STRING"}
			,{"name":"pernr","type":"STRING"}
            ,{"name":"sname","type":"STRING"}
            ,{"name":"dept_txt","type":"STRING"}
            ,{"name":"pos_text","type":"STRING"}
			,{"name":"job_text","type":"STRING"}
            ],
    "bq_table" : "project.dataset.ZCV_USER_VIEW"
    },
    "ZCV_BPARTNER_VIEW_CDS" : {
        "col" : ["bpcode"
				,"bu_group"
				,"name1"
				,"vkorg"
				,"taxnum"],
        "entity_2" : "ZCV_BPARTNER_VIEW",
        "bg_schema" : [
			 {"name":"bpcode","type":"STRING"}
			,{"name":"bu_group","type":"STRING"}
            ,{"name":"name1","type":"STRING"}
            ,{"name":"vkorg","type":"STRING"}
            ,{"name":"taxnum","type":"STRING"}
            ],
    "bq_table" : "project.dataset.ZCV_BPARTNER_VIEW"
    }
}

 

service_account.json

{
    "type": "service_account",
    "project_id": "projectid",
    "private_key_id": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
    "private_key": "-----BEGIN PRIVATE KEY-----\nxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n-----END PRIVATE KEY-----\n",
    "client_email": "datafusion-api-dev@projectid.iam.gserviceaccount.com",
    "client_id": "xxxxxxxxxxxxxxxxxxx",
    "auth_uri": "https://accounts.google.com/o/oauth2/auth",
    "token_uri": "https://oauth2.googleapis.com/token",
    "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
    "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/datafusion-api-dev%40ecorbit-dev.iam.gserviceaccount.com"
}

 

 

개발 완료 후,

Cloud Scheduler 에 스케쥴을 등록하면 된다.

 

 

 

 

결과적으로 개발공수를 통해

시간적인 비용과 리소스 비용을 모두 아낄수 있었다.

 

Cloud Data Fusion 은

1개의 job을 실행하기 위해 Dataproc을 생성해 프로비져닝 하는데 6분 정도의 시간이 소요되고, 비용도 추가되었다.

 

하지만 Cloud Fuction을 통해

프로비져닝 시간을 줄이고, Dataproc 인스턴스 비용까지 절약 할 수 있게되엇다.

 

 

 

(강제로) Data Fusion Studio  의 UI를 포기하고 얻은 성과인듯 싶다.

 

 

 

 

 

 

 

반응형