GCP

JAVA 프로젝트로 GCP BigQuery의 Bulk INSERT(Streaming buffer) 테이블 truncate 이슈

whistory 2022. 11. 14. 14:43
반응형

 

 

이 프로세스로 API를 가져와 BigQuery에 데이터를 저장하는 프로그램 개발한다.

테스트 중 InsertAll 로 저장한 테이블이 마음대로 컨트롤되지 않는다.

 

 

 

 

구글링해보니, InsertAll로 저장한 데이터는 일정시간동안 삭제할수 없다.

BigQuery: 404 "Table is truncated." when insert right after truncate

 

BigQuery: 404 "Table is truncated." when insert right after truncate

I truncate my table by executing a queryJob described here: https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries "truncate table " + PROJECT_ID + "." +

stackoverflow.com

 

 

 

고민끝에 TRUNCATE 가 아닌방법을 고심해본다.

TRUNCATE → INSERTALL (실패)

DELETE(DROP) → CREATE → INSERTALL (실패)

DELETE(DROP) → CREATE → TIMER → INSERTALL (성공)

void contextLoads() throws Exception {
        System.out.println("############### start ###############");

        String projectId = "ecorbit-dev";
        String datasetName = "test";
        String tableName = "java_test3";

        /*
            delete(drop) data
        */
        Boolean deleteResult =  deleteBigqueryTable(projectId, datasetName, tableName);
        System.out.println("Delete ==> " + deleteResult);

        /*
            create data
        */
        Schema schema =
                Schema.of(
                        Field.of("col01", StandardSQLTypeName.STRING),
                        Field.of("col02", StandardSQLTypeName.NUMERIC),
                        Field.of("col03", StandardSQLTypeName.DATETIME));
        Boolean createResult = createBigqueryTable(projectId, datasetName, tableName, schema);
        System.out.println("Create ==> " + createResult);

        // streaming insert를 위한 타이머.
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            System.out.println("error");
        }

        /*
            bulk insert data
            대량 데이터 insert
        */
        Boolean bulkInsertResult1 =  bulkInsertBigQuery(projectId, datasetName, tableName, "test1");
        System.out.println("Bulk insert ==> " + bulkInsertResult1);

        Boolean bulkInsertResult2 =  bulkInsertBigQuery(projectId, datasetName, tableName, "test2");
        System.out.println("Bulk insert ==> " + bulkInsertResult2);

        // Select
        executeSelectQueryString(projectId, selectQueryString);

        System.out.println("############### end ###############");
    }

/**
 * Delete(Drop) Bigquery table
 * @param projectId
 * @param datasetName
 * @param tableName
 */
public static Boolean deleteBigqueryTable(String projectId, String datasetName, String tableName) {
    try {
        BigQuery bigQuery = getBigQuery(projectId);
        TableId tableId = TableId.of(datasetName, tableName);
        
        bigQuery.delete(tableId);
        return true;
    } catch ( Exception e ) {
        e.printStackTrace();
        return false;
    }
}
/**
 * Create Bigquery table
 * @param projectId
 * @param datasetName
 * @param tableName
 * @param schema
 */
public static Boolean createBigqueryTable(String projectId, String datasetName, String tableName, Schema schema) {
    try {
        BigQuery bigQuery = getBigQuery(projectId);

        TableId tableId = TableId.of(datasetName, tableName);
        TableDefinition tableDefinition = StandardTableDefinition.of(schema);
        TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();

        bigQuery.create(tableInfo);
        return true;
    } catch ( Exception e ) {
        e.printStackTrace();
        return false;
    }
}
/**
 * Stream insert
 * @param projectId
 * @param datasetName
 * @param tableName
 */
public static Boolean bulkInsertBigQuery(String projectId, String datasetName, String tableName, String testValue) {
    try {
        BigQuery bigQuery = getBigQuery(projectId);

        TableId tableId = TableId.of(projectId, datasetName, tableName);
        List<InsertAllRequest.RowToInsert> rowContents = new ArrayList<>();

        String bgDateTimeNow = ZonedDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + "T"
                             + ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));

        Map<String, Object> rowContent = null;
        for ( int i = 0 ; i < 10000 ; i++ ) {
            String rowId = ZonedDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSSSSS"))+i;
            rowContent = new HashMap<>();
            rowContent.put("col01",  testValue);
            rowContent.put("col02",  i*10);
            rowContent.put("col03",  bgDateTimeNow);
            // id는중복된 행을 식별하는데 사용됨.
            rowContents.add(InsertAllRequest.RowToInsert.of(rowId, rowContent));
        }

        InsertAllResponse response = bigQuery.insertAll(InsertAllRequest.newBuilder(tableId)
                //rowId를 지정하면, 해당 row에 등록 동일한 rowId가 있으면 update
                //.addRow(UUID.randomUUID().toString(), rowContent)
                .setRows(rowContents)
                // More rows can be added in the same RPC by invoking .addRow() on the builder
                .build());

        if ( response.hasErrors() ) {
            // If any of the insertions failed, this lets you inspect the errors
            int errorCount = response.getInsertErrors().size();
            List <com.google.cloud.bigquery.BigQueryError> aaa = response.getInsertErrors().get(0);
            System.out.println("Total Error Count : " + errorCount);

            String errorReason = response.getErrorsFor(0).get(0).getReason();
            String errorLocation = response.getErrorsFor(0).get(0).getLocation();
            String errorMessage = response.getErrorsFor(0).get(0).getMessage();

            System.out.println("Error Reason\\t: " + errorReason);
            System.out.println("Error Location\\t: " + errorLocation);
            System.out.println("Error Message\\t: " + errorMessage);
            return false;
        } else {
            return true;
        }
    } catch ( Exception e ) {
        e.printStackTrace();
        return false;
    }
}

 

반응형