GCP/BigQuery on GCP

BigQuery Data Transfer Service 샘플 코드

whistory 2022. 9. 7. 16:15
반응형

 

 

 

 

 

 

 

 

 

데이터 세트 복사 | BigQuery Data Transfer Service | Google Cloud

 

데이터 세트 복사  |  BigQuery Data Transfer Service  |  Google Cloud

전송 구성을 만들어 프로젝트, 위치 또는 둘 다에서 데이터 세트의 모든 테이블을 복사합니다.

cloud.google.com

Google Docs에서 제공하는 기본 코드는 아래와 같다.

진짜 말그대로 테스트용.

 

from google.cloud import bigquery_datatransfer

transfer_client = bigquery_datatransfer.DataTransferServiceClient()

destination_project_id = "my-destination-project"
destination_dataset_id = "my_destination_dataset"

source_project_id = "my-source-project"
source_dataset_id = "my_source_dataset"

transfer_config = bigquery_datatransfer.TransferConfig(
    destination_dataset_id=destination_dataset_id,
    display_name="Your Dataset Copy Name",
    data_source_id="cross_region_copy",
    params={
        "source_project_id": source_project_id,
        "source_dataset_id": source_dataset_id,
    },
    schedule="every 24 hours",
)

transfer_config = transfer_client.create_transfer_config(
    parent=transfer_client.common_project_path(destination_project_id),
    transfer_config=transfer_config,
)

print(f"Created transfer config: {transfer_config.name}")

 

 

 

DTS job 생성에 대한 config설정에 대한 정보도 없고,

정말 단순하게 복사만 실행하는 코드샘플이다.

 

from google.cloud import bigquery_datatransfer
from google.oauth2 import service_account
from google.cloud import bigquery
import sys

## 입력값에 따른 프로젝트 SA Key 설정
def keySetting():
    try:
        arg = sys.argv[1]
    except IndexError:
        print("The project is required.")
        sys.exit(1)

    if ( arg == "1" ):      # project #1
        key_path = "/home/whiseung/workspace/keys/project1.json"
    elif ( arg == "2" ):   # project #2
        key_path = "/home/whiseung/workspace/keys/project2.json"
    elif ( arg == "3" ):   # project #3
        key_path = "/home/whiseung/workspace/keys/project3.json"
    else:
        print("The project does not exist.")
        sys.exit(1)

    return key_path

## 예약시간 설정을 위한 timestamp 생성 (-9 hours)
def gen_timestamp(date_string):
    from datetime import datetime, timedelta
    from google.protobuf.timestamp_pb2 import Timestamp
    
    cal_time = datetime.strptime(date_string, '%Y-%m-%d %H:%M:%S.%f') - timedelta(hours=9, minutes=0)
    t = cal_time.timestamp()
    seconds = int(t)
    nanos = int((t - seconds) * 10**9)
    proto_timestamp = Timestamp(seconds=seconds, nanos=nanos)

    return proto_timestamp

## Dataset이 없으면 생성
def create_target_dataset(get_target_projectId, get_dataset, key_path):
    from google.cloud.exceptions import NotFound

    # target_location = "US"     # US
    target_location = "asia-northeast3"     # Seoul Region

    client = bigquery.Client.from_service_account_json(key_path)    # Construct a BigQuery client object.

    try:
        client.get_dataset(get_dataset)     # Make an API request.
        print("\t\tDataset {} already exists".format(get_dataset))
    except NotFound:
        print("\t\tDataset {} is not found".format(get_dataset))

        dataset = bigquery.Dataset(get_target_projectId + "." + get_dataset)
        dataset.location = target_location
        dataset = client.create_dataset(dataset, timeout=30)  # Make an API request.
        print("\t\tCreated dataset {}.{}".format(client.project, dataset.dataset_id))


## Dataset 복사
def copy_dataset(get_target_projectId, get_source_projectId, get_source_dataset, get_target_dataset, key_path):
    transfer_client = bigquery_datatransfer.DataTransferServiceClient.from_service_account_json(key_path)

    destination_project_id = get_target_projectId
    destination_dataset_id = get_target_dataset

    source_project_id = get_source_projectId
    source_dataset_id = get_source_dataset

    print(f"\tCopy From [{source_project_id}.{source_dataset_id}] to [{destination_project_id}.{destination_dataset_id}]")

    transfer_config = bigquery_datatransfer.TransferConfig(
        destination_dataset_id = destination_dataset_id,
        display_name = "seoul_origin_" + get_target_dataset,
        data_source_id = "cross_region_copy",       ## fix
        schedule_options = {
            "disable_auto_scheduling": False,       # true이면 이 구성의 데이터 전송 실행 자동 예약이 사용 중지됩니다.
            "start_time": gen_timestamp("2022-03-09 01:50:00.321952"),
            "end_time": gen_timestamp("2023-01-15 12:12:12.321952")
        },
        params = {
            "source_project_id": source_project_id,
            "source_dataset_id": source_dataset_id,
            "overwrite_destination_table": True     ## 증분데이터 일배치 시 적재
        }
    )

    transfer_config = transfer_client.create_transfer_config(
        parent = transfer_client.common_project_path(destination_project_id),
        transfer_config = transfer_config
    )

    print(f"\t[Target - {get_source_dataset}] Created transfer config: {transfer_config.name}")



## 실제 실행부
if __name__ == "__main__":
    key_path = keySetting()

    source_client = bigquery.Client.from_service_account_json(key_path)
    source_datasets = list(source_client.list_datasets())  # Make an API request.
    source_project = source_client.project

    ## Run the dataset loop
    if source_datasets:
        print("Source Datasets in project {}:".format(source_project))
        
        ## 프로젝트 안에 존재하는 dataset 수 만큼 loop
        for source_dataset in source_datasets:
            create_target_dataset(source_project, source_dataset.dataset_id+"_bak", key_path)
            copy_dataset(source_project, source_project, source_dataset.dataset_id, source_dataset.dataset_id+"_bak", key_path)
    else:
        print("{} Source project does not contain any datasets.".format(source_project))

 

 

 

 

DTS 사용 제약사항.

반응형