반응형
이 프로세스로 openAPI를 가져와 BigQuery에 데이터를 저장하는 프로그램 개발한다.
테스트 중 InsertAll 로 저장한 테이블이 마음대로 컨트롤되지 않는다.
찾아보니 :
쿼리를 날려도 에러가 발생한다.
동일한 에러가 발생한다.
고민끝에 TRUNCATE 가 아닌방법을 고심해본다.
TRUNCATE → INSERTALL (실패)
DELETE(DROP) → CREATE → INSERTALL (실패)
DELETE(DROP) → CREATE → TIMER → INSERTALL (성공)
sleep 을 통해 일정시간 후에 insertAll 을 실행하여 문제를 해결했다.
void contextLoads() throws Exception {
System.out.println("############### start ###############");
String projectId = "project_id";
String datasetName = "dataset_name";
String tableName = "table_name";
/*
delete(drop) data
*/
Boolean deleteResult = deleteBigqueryTable(projectId, datasetName, tableName);
System.out.println("Delete ==> " + deleteResult);
/*
create data
*/
Schema schema =
Schema.of(
Field.of("col01", StandardSQLTypeName.STRING),
Field.of("col02", StandardSQLTypeName.NUMERIC),
Field.of("col03", StandardSQLTypeName.DATETIME));
Boolean createResult = createBigqueryTable(projectId, datasetName, tableName, schema);
System.out.println("Create ==> " + createResult);
// streaming insert를 위한 타이머.
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
System.out.println("error");
}
/*
bulk insert data
대량 데이터 insert
*/
Boolean bulkInsertResult1 = bulkInsertBigQuery(projectId, datasetName, tableName, "test1");
System.out.println("Bulk insert ==> " + bulkInsertResult1);
Boolean bulkInsertResult2 = bulkInsertBigQuery(projectId, datasetName, tableName, "test2");
System.out.println("Bulk insert ==> " + bulkInsertResult2);
// Select
executeSelectQueryString(projectId, selectQueryString);
System.out.println("############### end ###############");
}
/**
* Delete(Drop) Bigquery table
* @param projectId
* @param datasetName
* @param tableName
*/
public static Boolean deleteBigqueryTable(String projectId, String datasetName, String tableName) {
try {
BigQuery bigQuery = getBigQuery(projectId);
TableId tableId = TableId.of(datasetName, tableName);
bigQuery.delete(tableId);
return true;
} catch ( Exception e ) {
e.printStackTrace();
return false;
}
}
/**
* Create Bigquery table
* @param projectId
* @param datasetName
* @param tableName
* @param schema
*/
public static Boolean createBigqueryTable(String projectId, String datasetName, String tableName, Schema schema) {
try {
BigQuery bigQuery = getBigQuery(projectId);
TableId tableId = TableId.of(datasetName, tableName);
TableDefinition tableDefinition = StandardTableDefinition.of(schema);
TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();
bigQuery.create(tableInfo);
return true;
} catch ( Exception e ) {
e.printStackTrace();
return false;
}
}
/**
* Stream insert
* @param projectId
* @param datasetName
* @param tableName
*/
public static Boolean bulkInsertBigQuery(String projectId, String datasetName, String tableName, String testValue) {
try {
BigQuery bigQuery = getBigQuery(projectId);
TableId tableId = TableId.of(projectId, datasetName, tableName);
List<InsertAllRequest.RowToInsert> rowContents = new ArrayList<>();
String bgDateTimeNow = ZonedDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + "T"
+ ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));
Map<String, Object> rowContent = null;
for ( int i = 0 ; i < 10000 ; i++ ) {
String rowId = ZonedDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSSSSS"))+i;
rowContent = new HashMap<>();
rowContent.put("col01", testValue);
rowContent.put("col02", i*10);
rowContent.put("col03", bgDateTimeNow);
// id는 중복된 행을 식별하는데 사용됨.
rowContents.add(InsertAllRequest.RowToInsert.of(rowId, rowContent));
}
InsertAllResponse response = bigQuery.insertAll(InsertAllRequest.newBuilder(tableId)
//rowId를 지정하면, 해당 row에 등록 동일한 rowId가 있으면 update
//.addRow(UUID.randomUUID().toString(), rowContent)
.setRows(rowContents)
// More rows can be added in the same RPC by invoking .addRow() on the builder
.build());
if ( response.hasErrors() ) {
// If any of the insertions failed, this lets you inspect the errors
int errorCount = response.getInsertErrors().size();
List <com.google.cloud.bigquery.BigQueryError> aaa = response.getInsertErrors().get(0);
System.out.println("Total Error Count : " + errorCount);
String errorReason = response.getErrorsFor(0).get(0).getReason();
String errorLocation = response.getErrorsFor(0).get(0).getLocation();
String errorMessage = response.getErrorsFor(0).get(0).getMessage();
System.out.println("Error Reason\\t: " + errorReason);
System.out.println("Error Location\\t: " + errorLocation);
System.out.println("Error Message\\t: " + errorMessage);
return false;
} else {
return true;
}
} catch ( Exception e ) {
e.printStackTrace();
return false;
}
}
반응형
'GCP > BigQuery on GCP' 카테고리의 다른 글
JAVA 프로젝트로 GCP BigQuery에 BULK INSERT(Streaming buffer)로 데이터 집어넣기 (0) | 2023.01.16 |
---|---|
JAVA 프로젝트로 GCP BigQuery의 INSERT/UPDATE 쿼리 실행 해보기 (0) | 2023.01.16 |
JAVA 프로젝트로 GCP BigQuery의 Select 쿼리 실행 해보기 (0) | 2023.01.10 |
JAVA 프로젝트로 GCP BigQuery의 Dataset 가져오기 (0) | 2022.10.28 |
BigQuery Data Transfer Service 샘플 코드 (1) | 2022.09.07 |