GCP/BigQuery on GCP

JAVA 프로젝트로 GCP BigQuery TRUNCATE TABLE 후, bulk INSERT(Streaming buffer) "Table is truncated." 에러발생

whistory 2023. 1. 17. 10:43
반응형

이 프로세스로 openAPI를 가져와 BigQuery에 데이터를 저장하는 프로그램 개발한다.

 

테스트 중 InsertAll 로 저장한 테이블이 마음대로 컨트롤되지 않는다.

 

 

 

찾아보니 :

https://stackoverflow.com/questions/70013949/bigquery-404-table-is-truncated-when-insert-right-after-truncate

 

BigQuery: 404 "Table is truncated." when insert right after truncate

I truncate my table by executing a queryJob described here: https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries "truncate table " + PROJECT_ID + "." +

stackoverflow.com

 

쿼리를 날려도 에러가 발생한다.

 

 

 

 

 

 

동일한 에러가 발생한다.

고민끝에 TRUNCATE 가 아닌방법을 고심해본다.

 

TRUNCATE → INSERTALL (실패)

DELETE(DROP) → CREATE → INSERTALL (실패)

DELETE(DROP) → CREATE → TIMER → INSERTALL (성공)

 

sleep 을 통해 일정시간 후에 insertAll 을 실행하여 문제를 해결했다.

 

void contextLoads() throws Exception {
  System.out.println("############### start ###############");

  String projectId = "project_id";
  String datasetName = "dataset_name";
  String tableName = "table_name";

  /*
      delete(drop) data
  */
  Boolean deleteResult =  deleteBigqueryTable(projectId, datasetName, tableName);
  System.out.println("Delete ==> " + deleteResult);

  /*
      create data
  */
  Schema schema =
          Schema.of(
                  Field.of("col01", StandardSQLTypeName.STRING),
                  Field.of("col02", StandardSQLTypeName.NUMERIC),
                  Field.of("col03", StandardSQLTypeName.DATETIME));
  Boolean createResult = createBigqueryTable(projectId, datasetName, tableName, schema);
  System.out.println("Create ==> " + createResult);

  // streaming insert를 위한 타이머.
  try {
      Thread.sleep(10000);
  } catch (InterruptedException e) {
      System.out.println("error");
  }

  /*
      bulk insert data
      대량 데이터 insert
  */
  Boolean bulkInsertResult1 =  bulkInsertBigQuery(projectId, datasetName, tableName, "test1");
  System.out.println("Bulk insert ==> " + bulkInsertResult1);

  Boolean bulkInsertResult2 =  bulkInsertBigQuery(projectId, datasetName, tableName, "test2");
  System.out.println("Bulk insert ==> " + bulkInsertResult2);

  // Select
  executeSelectQueryString(projectId, selectQueryString);

  System.out.println("############### end ###############");
}
/**
 * Delete(Drop) Bigquery table
 * @param projectId
 * @param datasetName
 * @param tableName
 */
public static Boolean deleteBigqueryTable(String projectId, String datasetName, String tableName) {
    try {
        BigQuery bigQuery = getBigQuery(projectId);
        TableId tableId = TableId.of(datasetName, tableName);
        
        bigQuery.delete(tableId);
        return true;
    } catch ( Exception e ) {
        e.printStackTrace();
        return false;
    }
}

/**
 * Create Bigquery table
 * @param projectId
 * @param datasetName
 * @param tableName
 * @param schema
 */
public static Boolean createBigqueryTable(String projectId, String datasetName, String tableName, Schema schema) {
    try {
        BigQuery bigQuery = getBigQuery(projectId);

        TableId tableId = TableId.of(datasetName, tableName);
        TableDefinition tableDefinition = StandardTableDefinition.of(schema);
        TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();

        bigQuery.create(tableInfo);
        return true;
    } catch ( Exception e ) {
        e.printStackTrace();
        return false;
    }
}

/**
 * Stream insert
 * @param projectId
 * @param datasetName
 * @param tableName
 */
public static Boolean bulkInsertBigQuery(String projectId, String datasetName, String tableName, String testValue) {
    try {
        BigQuery bigQuery = getBigQuery(projectId);

        TableId tableId = TableId.of(projectId, datasetName, tableName);
        List<InsertAllRequest.RowToInsert> rowContents = new ArrayList<>();

        String bgDateTimeNow = ZonedDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + "T"
                             + ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));

        Map<String, Object> rowContent = null;
        for ( int i = 0 ; i < 10000 ; i++ ) {
            String rowId = ZonedDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSSSSS"))+i;
            rowContent = new HashMap<>();
            rowContent.put("col01",  testValue);
            rowContent.put("col02",  i*10);
            rowContent.put("col03",  bgDateTimeNow);
            // id는 중복된 행을 식별하는데 사용됨.
            rowContents.add(InsertAllRequest.RowToInsert.of(rowId, rowContent));
        }

        InsertAllResponse response = bigQuery.insertAll(InsertAllRequest.newBuilder(tableId)
                //rowId를 지정하면, 해당 row에 등록 동일한 rowId가 있으면 update
                //.addRow(UUID.randomUUID().toString(), rowContent)
                .setRows(rowContents)
                // More rows can be added in the same RPC by invoking .addRow() on the builder
                .build());

        if ( response.hasErrors() ) {
            // If any of the insertions failed, this lets you inspect the errors
            int errorCount = response.getInsertErrors().size();
            List <com.google.cloud.bigquery.BigQueryError> aaa = response.getInsertErrors().get(0);
            System.out.println("Total Error Count : " + errorCount);

            String errorReason = response.getErrorsFor(0).get(0).getReason();
            String errorLocation = response.getErrorsFor(0).get(0).getLocation();
            String errorMessage = response.getErrorsFor(0).get(0).getMessage();

            System.out.println("Error Reason\\t: " + errorReason);
            System.out.println("Error Location\\t: " + errorLocation);
            System.out.println("Error Message\\t: " + errorMessage);
            return false;
        } else {
            return true;
        }
    } catch ( Exception e ) {
        e.printStackTrace();
        return false;
    }
}

 

 

 

 

 

 

 

 

 

반응형