GCP

JAVA 프로젝트로 GCP BigQuery의 Bulk INSERT(Streaming buffer)

whistory 2022. 11. 7. 11:13
반응형

 

 

 

 

 

void contextLoads() throws Exception {
        System.out.println("############### start ###############");

        String projectId = "ecorbit-dev";
        String datasetName = "test";
        String tableName = "java_test3";

				Boolean bulkInsertResult1 =  bulkInsertBigQuery(projectId, datasetName, tableName, "test1");
        System.out.println("Bulk insert ==> " + bulkInsertResult1);

        Boolean bulkInsertResult2 =  bulkInsertBigQuery(projectId, datasetName, tableName, "test2");
        System.out.println("Bulk insert ==> " + bulkInsertResult2);

        System.out.println("############### end ###############");
    }
/**
     * Stream insert
     * @param projectId
     * @param datasetName
     * @param tableName
     */
    public static Boolean bulkInsertBigQuery(String projectId, String datasetName, String tableName, String testValue) {
        try {
            BigQuery bigQuery = getBigQuery(projectId);

            TableId tableId = TableId.of(projectId, datasetName, tableName);
            List<InsertAllRequest.RowToInsert> rowContents = new ArrayList<>();

            String bgDateTimeNow = ZonedDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + "T"
                                 + ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));

            Map<String, Object> rowContent = null;
            for ( int i = 0 ; i < 10000 ; i++ ) {
                String rowId = ZonedDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSSSSS"))+i;
                rowContent = new HashMap<>();
                rowContent.put("col01",  testValue);
                rowContent.put("col02",  i*10);
                rowContent.put("col03",  bgDateTimeNow);
                // id는중복된 행을 식별하는데 사용됨.
                rowContents.add(InsertAllRequest.RowToInsert.of(rowId, rowContent));
            }

            InsertAllResponse response = bigQuery.insertAll(InsertAllRequest.newBuilder(tableId)
                    //rowId를 지정하면, 해당 row에 등록 동일한 rowId가 있으면 update
                    //.addRow(UUID.randomUUID().toString(), rowContent)
                    .setRows(rowContents)
                    // More rows can be added in the same RPC by invoking .addRow() on the builder
                    .build());

            if ( response.hasErrors() ) {
                // If any of the insertions failed, this lets you inspect the errors
                int errorCount = response.getInsertErrors().size();
                List <com.google.cloud.bigquery.BigQueryError> aaa = response.getInsertErrors().get(0);
                System.out.println("Total Error Count : " + errorCount);

                String errorReason = response.getErrorsFor(0).get(0).getReason();
                String errorLocation = response.getErrorsFor(0).get(0).getLocation();
                String errorMessage = response.getErrorsFor(0).get(0).getMessage();

                System.out.println("Error Reason\\t: " + errorReason);
                System.out.println("Error Location\\t: " + errorLocation);
                System.out.println("Error Message\\t: " + errorMessage);
                return false;
            } else {
                return true;
            }
        } catch ( Exception e ) {
            e.printStackTrace();
            return false;
        }
    }
############### start ###############
Insert rows : 	: 10000
Bulk insert ==> true
Insert rows : 	: 10000
Bulk insert ==> true
############### end ###############
반응형