GCP/BigQuery on GCP

JAVA 프로젝트로 GCP BigQuery에 BULK INSERT(Streaming buffer)로 데이터 집어넣기

whistory 2023. 1. 16. 14:42
반응형

https://whiseung.tistory.com/entry/JAVA-%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8%EB%A1%9C-GCP-BigQuery%EC%9D%98-INSERTUPDATE-%EC%BF%BC%EB%A6%AC-%EB%82%A0%EB%A0%A4%EB%B3%B4%EA%B8%B0

 

JAVA 프로젝트로 GCP BigQuery의 INSERT/UPDATE 쿼리 날려보기

https://whiseung.tistory.com/entry/JAVA-%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8%EB%A1%9C-GCP-BigQuery%EC%9D%98-Select-%EC%BF%BC%EB%A6%AC-%EB%82%A0%EB%A0%A4%EB%B3%B4%EA%B8%B0 true Update ==> true ############### end ###############

whiseung.tistory.com

SQL 문을 통해 데이터를 조작해보았다.

 

이제는 대량의 데이터를 한번이 insert 하는 bulk insert 기능을 구현해본다.

 

■ 호출부

void contextLoads() throws Exception {
    System.out.println("############### start ###############");

    String projectId = "project_id";
    String datasetName = "dataset_name";
    String tableName = "table_name";

	Boolean bulkInsertResult1 =  bulkInsertBigQuery(projectId, datasetName, tableName, "test1");
    System.out.println("Bulk insert ==> " + bulkInsertResult1);

    Boolean bulkInsertResult2 =  bulkInsertBigQuery(projectId, datasetName, tableName, "test2");
    System.out.println("Bulk insert ==> " + bulkInsertResult2);

    System.out.println("############### end ###############");
}

■ 실행부

/**
 * Stream insert
 * @param projectId
 * @param datasetName
 * @param tableName
 */
public static Boolean bulkInsertBigQuery(String projectId, String datasetName, String tableName, String testValue) {
    try {
        BigQuery bigQuery = getBigQuery(projectId);

        TableId tableId = TableId.of(projectId, datasetName, tableName);
        List<InsertAllRequest.RowToInsert> rowContents = new ArrayList<>();

        String bgDateTimeNow = ZonedDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + "T"
                             + ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));

        Map<String, Object> rowContent = null;
        for ( int i = 0 ; i < 10000 ; i++ ) {
            String rowId = ZonedDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSSSSS"))+i;
            rowContent = new HashMap<>();
            rowContent.put("col01",  testValue);
            rowContent.put("col02",  i*10);
            rowContent.put("col03",  bgDateTimeNow);
            // id는중복된 행을 식별하는데 사용됨.
            rowContents.add(InsertAllRequest.RowToInsert.of(rowId, rowContent));
        }

        InsertAllResponse response = bigQuery.insertAll(InsertAllRequest.newBuilder(tableId)
                //rowId를 지정하면, 해당 row에 등록 동일한 rowId가 있으면 update
                //.addRow(UUID.randomUUID().toString(), rowContent)
                .setRows(rowContents)
                // More rows can be added in the same RPC by invoking .addRow() on the builder
                .build());

        if ( response.hasErrors() ) {
            // If any of the insertions failed, this lets you inspect the errors
            int errorCount = response.getInsertErrors().size();
            List <com.google.cloud.bigquery.BigQueryError> aaa = response.getInsertErrors().get(0);
            System.out.println("Total Error Count : " + errorCount);

            String errorReason = response.getErrorsFor(0).get(0).getReason();
            String errorLocation = response.getErrorsFor(0).get(0).getLocation();
            String errorMessage = response.getErrorsFor(0).get(0).getMessage();

            System.out.println("Error Reason\\t: " + errorReason);
            System.out.println("Error Location\\t: " + errorLocation);
            System.out.println("Error Message\\t: " + errorMessage);
            return false;
        } else {
            return true;
        }
    } catch ( Exception e ) {
        e.printStackTrace();
        return false;
    }
}

■ 결과

############### start ###############
Insert rows : 	: 10000
Bulk insert ==> true
Insert rows : 	: 10000
Bulk insert ==> true
############### end ###############
반응형