반응형
데이터 세트 복사 | BigQuery Data Transfer Service | Google Cloud
Google Docs에서 제공하는 기본 코드는 아래와 같다.
진짜 말그대로 테스트용.
from google.cloud import bigquery_datatransfer
transfer_client = bigquery_datatransfer.DataTransferServiceClient()
destination_project_id = "my-destination-project"
destination_dataset_id = "my_destination_dataset"
source_project_id = "my-source-project"
source_dataset_id = "my_source_dataset"
transfer_config = bigquery_datatransfer.TransferConfig(
destination_dataset_id=destination_dataset_id,
display_name="Your Dataset Copy Name",
data_source_id="cross_region_copy",
params={
"source_project_id": source_project_id,
"source_dataset_id": source_dataset_id,
},
schedule="every 24 hours",
)
transfer_config = transfer_client.create_transfer_config(
parent=transfer_client.common_project_path(destination_project_id),
transfer_config=transfer_config,
)
print(f"Created transfer config: {transfer_config.name}")
DTS job 생성에 대한 config설정에 대한 정보도 없고,
정말 단순하게 복사만 실행하는 코드샘플이다.
from google.cloud import bigquery_datatransfer
from google.oauth2 import service_account
from google.cloud import bigquery
import sys
## 입력값에 따른 프로젝트 SA Key 설정
def keySetting():
try:
arg = sys.argv[1]
except IndexError:
print("The project is required.")
sys.exit(1)
if ( arg == "1" ): # project #1
key_path = "/home/whiseung/workspace/keys/project1.json"
elif ( arg == "2" ): # project #2
key_path = "/home/whiseung/workspace/keys/project2.json"
elif ( arg == "3" ): # project #3
key_path = "/home/whiseung/workspace/keys/project3.json"
else:
print("The project does not exist.")
sys.exit(1)
return key_path
## 예약시간 설정을 위한 timestamp 생성 (-9 hours)
def gen_timestamp(date_string):
from datetime import datetime, timedelta
from google.protobuf.timestamp_pb2 import Timestamp
cal_time = datetime.strptime(date_string, '%Y-%m-%d %H:%M:%S.%f') - timedelta(hours=9, minutes=0)
t = cal_time.timestamp()
seconds = int(t)
nanos = int((t - seconds) * 10**9)
proto_timestamp = Timestamp(seconds=seconds, nanos=nanos)
return proto_timestamp
## Dataset이 없으면 생성
def create_target_dataset(get_target_projectId, get_dataset, key_path):
from google.cloud.exceptions import NotFound
# target_location = "US" # US
target_location = "asia-northeast3" # Seoul Region
client = bigquery.Client.from_service_account_json(key_path) # Construct a BigQuery client object.
try:
client.get_dataset(get_dataset) # Make an API request.
print("\t\tDataset {} already exists".format(get_dataset))
except NotFound:
print("\t\tDataset {} is not found".format(get_dataset))
dataset = bigquery.Dataset(get_target_projectId + "." + get_dataset)
dataset.location = target_location
dataset = client.create_dataset(dataset, timeout=30) # Make an API request.
print("\t\tCreated dataset {}.{}".format(client.project, dataset.dataset_id))
## Dataset 복사
def copy_dataset(get_target_projectId, get_source_projectId, get_source_dataset, get_target_dataset, key_path):
transfer_client = bigquery_datatransfer.DataTransferServiceClient.from_service_account_json(key_path)
destination_project_id = get_target_projectId
destination_dataset_id = get_target_dataset
source_project_id = get_source_projectId
source_dataset_id = get_source_dataset
print(f"\tCopy From [{source_project_id}.{source_dataset_id}] to [{destination_project_id}.{destination_dataset_id}]")
transfer_config = bigquery_datatransfer.TransferConfig(
destination_dataset_id = destination_dataset_id,
display_name = "seoul_origin_" + get_target_dataset,
data_source_id = "cross_region_copy", ## fix
schedule_options = {
"disable_auto_scheduling": False, # true이면 이 구성의 데이터 전송 실행 자동 예약이 사용 중지됩니다.
"start_time": gen_timestamp("2022-03-09 01:50:00.321952"),
"end_time": gen_timestamp("2023-01-15 12:12:12.321952")
},
params = {
"source_project_id": source_project_id,
"source_dataset_id": source_dataset_id,
"overwrite_destination_table": True ## 증분데이터 일배치 시 적재
}
)
transfer_config = transfer_client.create_transfer_config(
parent = transfer_client.common_project_path(destination_project_id),
transfer_config = transfer_config
)
print(f"\t[Target - {get_source_dataset}] Created transfer config: {transfer_config.name}")
## 실제 실행부
if __name__ == "__main__":
key_path = keySetting()
source_client = bigquery.Client.from_service_account_json(key_path)
source_datasets = list(source_client.list_datasets()) # Make an API request.
source_project = source_client.project
## Run the dataset loop
if source_datasets:
print("Source Datasets in project {}:".format(source_project))
## 프로젝트 안에 존재하는 dataset 수 만큼 loop
for source_dataset in source_datasets:
create_target_dataset(source_project, source_dataset.dataset_id+"_bak", key_path)
copy_dataset(source_project, source_project, source_dataset.dataset_id, source_dataset.dataset_id+"_bak", key_path)
else:
print("{} Source project does not contain any datasets.".format(source_project))
DTS 사용 제약사항.
반응형
'GCP > BigQuery on GCP' 카테고리의 다른 글
JAVA 프로젝트로 GCP BigQuery의 INSERT/UPDATE 쿼리 실행 해보기 (0) | 2023.01.16 |
---|---|
JAVA 프로젝트로 GCP BigQuery의 Select 쿼리 실행 해보기 (0) | 2023.01.10 |
JAVA 프로젝트로 GCP BigQuery의 Dataset 가져오기 (0) | 2022.10.28 |
BigQuery Data Transfer 계획 (0) | 2022.09.06 |
BigQuery Region 이관(변경) 프로젝트 (0) | 2022.09.06 |