diff --git a/src/main/java/org/embulk/output/BigqueryAuthentication.java b/src/main/java/org/embulk/output/BigqueryAuthentication.java index a536dfa..84bfa97 100644 --- a/src/main/java/org/embulk/output/BigqueryAuthentication.java +++ b/src/main/java/org/embulk/output/BigqueryAuthentication.java @@ -47,9 +47,11 @@ public BigqueryAuthentication(String authMethod, Optional serviceAccount if (authMethod.toLowerCase().equals("compute_engine")) { this.credentials = getComputeCredential(); - } else if(authMethod.toLowerCase().equals("json_key")) { + } + else if (authMethod.toLowerCase().equals("json_key")) { this.credentials = getServiceAccountCredentialFromJsonFile(); - } else { + } + else { this.credentials = getServiceAccountCredential(); } } @@ -111,4 +113,4 @@ public Bigquery getBigqueryClient() throws GoogleJsonResponseException, IOExcept return client; } -} \ No newline at end of file +} diff --git a/src/main/java/org/embulk/output/BigqueryOutputPlugin.java b/src/main/java/org/embulk/output/BigqueryOutputPlugin.java index d845b08..f8aff8c 100644 --- a/src/main/java/org/embulk/output/BigqueryOutputPlugin.java +++ b/src/main/java/org/embulk/output/BigqueryOutputPlugin.java @@ -146,7 +146,7 @@ public interface PluginTask } private final Logger log = Exec.getLogger(BigqueryOutputPlugin.class); - private final static String temporaryTableSuffix = Long.toString(System.currentTimeMillis()); + private static final String temporaryTableSuffix = Long.toString(System.currentTimeMillis()); private static BigqueryWriter bigQueryWriter; @Override @@ -161,7 +161,8 @@ public ConfigDiff transaction(ConfigSource config, int taskCount, } try { task.setP12Keyfile(Optional.of(LocalFile.of(task.getP12KeyfilePath().get()))); - } catch (IOException ex) { + } + catch (IOException ex) { throw Throwables.propagate(ex); } } @@ -172,7 +173,8 @@ public ConfigDiff transaction(ConfigSource config, int taskCount, } try { task.setSchemaFile(Optional.of(LocalFile.of(task.getSchemaPath().get()))); - } catch (IOException ex) { + } + catch (IOException ex) { throw Throwables.propagate(ex); } } @@ -181,7 +183,8 @@ public ConfigDiff transaction(ConfigSource config, int taskCount, if (!task.getJsonKeyfile().isPresent()) { throw new ConfigException("If auth_method is json_key, you have to set json_keyfile"); } - } else if (task.getAuthMethod().getString().equals("private_key")) { + } + else if (task.getAuthMethod().getString().equals("private_key")) { if (!task.getP12Keyfile().isPresent() || !task.getServiceAccountEmail().isPresent()) { throw new ConfigException("If auth_method is private_key, you have to set both service_account_email and p12_keyfile"); } @@ -200,7 +203,7 @@ public ConfigDiff transaction(ConfigSource config, int taskCount, } try { - bigQueryWriter = new BigqueryWriter.Builder ( + bigQueryWriter = new BigqueryWriter.Builder( task.getAuthMethod().getString(), task.getServiceAccountEmail(), task.getP12Keyfile().transform(localFileToPathString()), @@ -222,7 +225,8 @@ public ConfigDiff transaction(ConfigSource config, int taskCount, bigQueryWriter.checkConfig(task.getProject(), task.getDataset(), task.getTable()); - } catch (IOException | GeneralSecurityException ex) { + } + catch (IOException | GeneralSecurityException ex) { throw new ConfigException(ex); } // non-retryable (non-idempotent) output: @@ -242,7 +246,8 @@ public ConfigDiff resume(TaskSource taskSource, if (mode == Mode.delete_in_advance) { try { bigQueryWriter.deleteTable(project, dataset, generateTableName(tableName)); - } catch (IOException ex) { + } + catch (IOException ex) { log.warn(ex.getMessage()); } } @@ -255,13 +260,16 @@ public ConfigDiff resume(TaskSource taskSource, bigQueryWriter.replaceTable(project, dataset, generateTableName(tableName) + "_old", generateTableName(tableName)); } bigQueryWriter.replaceTable(project, dataset, generateTableName(tableName), generateTemporaryTableName(tableName)); - } catch (TimeoutException | BigqueryWriter.JobFailedException | IOException ex) { + } + catch (TimeoutException | BigqueryWriter.JobFailedException | IOException ex) { log.error(ex.getMessage()); throw Throwables.propagate(ex); - } finally { + } + finally { try { bigQueryWriter.deleteTable(project, dataset, generateTemporaryTableName(tableName)); - } catch (IOException ex) { + } + catch (IOException ex) { log.warn(ex.getMessage()); } } @@ -328,7 +336,8 @@ public void nextFile() } log.info(String.format("Writing file [%s]", filePath)); output = new BufferedOutputStream(new FileOutputStream(filePath)); - } catch (FileNotFoundException ex) { + } + catch (FileNotFoundException ex) { throw Throwables.propagate(ex); } fileIndex++; @@ -339,7 +348,8 @@ private void closeFile() if (output != null) { try { output.close(); - } catch (IOException ex) { + } + catch (IOException ex) { throw Throwables.propagate(ex); } } @@ -349,9 +359,11 @@ public void add(Buffer buffer) { try { output.write(buffer.array(), buffer.offset(), buffer.limit()); - } catch (IOException ex) { + } + catch (IOException ex) { throw Throwables.propagate(ex); - } finally { + } + finally { buffer.release(); } } @@ -367,7 +379,8 @@ public void finish() log.info(String.format("Delete local file [%s]", filePath)); file.delete(); } - } catch (NoSuchAlgorithmException | TimeoutException | BigqueryWriter.JobFailedException | IOException ex) { + } + catch (NoSuchAlgorithmException | TimeoutException | BigqueryWriter.JobFailedException | IOException ex) { log.error(ex.getMessage()); throw Throwables.propagate(ex); } @@ -447,15 +460,24 @@ public enum Mode append("append"), delete_in_advance("delete_in_advance") { @Override - public boolean isDeleteInAdvance() { return true; } + public boolean isDeleteInAdvance() + { + return true; + } }, replace("replace") { @Override - public boolean isReplaceMode() { return true; } + public boolean isReplaceMode() + { + return true; + } }, replace_backup("replace_backup") { @Override - public boolean isReplaceMode() { return true; } + public boolean isReplaceMode() + { + return true; + } }; private final String string; @@ -465,8 +487,17 @@ public enum Mode this.string = string; } - public String getString() { return string; } - public boolean isReplaceMode() { return false; } - public boolean isDeleteInAdvance() { return true; } + public String getString() + { + return string; + } + public boolean isReplaceMode() + { + return false; + } + public boolean isDeleteInAdvance() + { + return true; + } } } diff --git a/src/main/java/org/embulk/output/BigqueryWriter.java b/src/main/java/org/embulk/output/BigqueryWriter.java index 69219a6..6dbb930 100644 --- a/src/main/java/org/embulk/output/BigqueryWriter.java +++ b/src/main/java/org/embulk/output/BigqueryWriter.java @@ -72,7 +72,8 @@ public BigqueryWriter(Builder builder) if (autoCreateTable) { this.tableSchema = createTableSchema(); - } else { + } + else { this.tableSchema = null; } } @@ -100,7 +101,8 @@ private String getJobStatus(String project, JobReference jobRef) throws JobFaile log.info(String.format("Job statistics [%s]", statistics.getLoad())); } return jobStatus; - } catch (IOException ex) { + } + catch (IOException ex) { log.warn(ex.getMessage()); return "UNKNOWN"; } @@ -118,14 +120,17 @@ private void getJobStatusUntilDone(String project, JobReference jobRef) throws T if (jobStatus.equals("DONE")) { log.info(String.format("Job completed successfully. job id:[%s] elapsed_time:%dms status:[%s]", jobRef.getJobId(), elapsedTime, "SUCCESS")); break; - } else if (elapsedTime > jobStatusMaxPollingTime * 1000) { + } + else if (elapsedTime > jobStatusMaxPollingTime * 1000) { throw new TimeoutException(String.format("Checking job status...Timeout. job id:[%s] elapsed_time:%dms status:[%s]", jobRef.getJobId(), elapsedTime, "TIMEOUT")); - } else { + } + else { log.info(String.format("Checking job status... job id:[%s] elapsed_time:%dms status:[%s]", jobRef.getJobId(), elapsedTime, jobStatus)); } Thread.sleep(jobStatusPollingInterval * 1000); } - } catch (InterruptedException ex) { + } + catch (InterruptedException ex) { log.warn(ex.getMessage()); } } @@ -171,13 +176,15 @@ public void executeLoad(String project, String dataset, String table, String loc try { jobRef = insert.execute().getJobReference(); - } catch (IllegalStateException ex) { + } + catch (IllegalStateException ex) { throw new JobFailedException(ex.getMessage()); } log.info(String.format("Job executed. job id:[%s] file:[%s]", jobRef.getJobId(), localFilePath)); if (isSkipJobResultCheck) { log.info(String.format("Skip job status check. job id:[%s]", jobRef.getJobId())); - } else { + } + else { getJobStatusUntilDone(project, jobRef); } } @@ -203,19 +210,22 @@ public void copyTable(String project, String dataset, String fromTable, String t try { jobRef = insert.execute().getJobReference(); - } catch (IllegalStateException ex) { + } + catch (IllegalStateException ex) { throw new JobFailedException(ex.getMessage()); } log.info(String.format("Job executed. job id:[%s]", jobRef.getJobId())); getJobStatusUntilDone(project, jobRef); } - public void deleteTable(String project, String dataset, String table) throws IOException { + public void deleteTable(String project, String dataset, String table) throws IOException + { try { Tables.Delete delete = bigQueryClient.tables().delete(project, dataset, table); delete.execute(); log.info(String.format("Table deleted. project:%s dataset:%s table:%s", delete.getProjectId(), delete.getDatasetId(), delete.getTableId())); - } catch (GoogleJsonResponseException ex) { + } + catch (GoogleJsonResponseException ex) { log.warn(ex.getMessage()); } } @@ -238,7 +248,8 @@ private JobConfigurationLoad setLoadConfig(String project, String dataset, Strin config.setSchema(tableSchema); config.setCreateDisposition("CREATE_IF_NEEDED"); log.info(String.format("table:[%s] will be create if not exists", table)); - } else { + } + else { config.setCreateDisposition("CREATE_NEVER"); } return config; @@ -252,7 +263,8 @@ private JobConfigurationTableCopy setCopyConfig(String project, String dataset, if (append) { config.setWriteDisposition("WRITE_APPEND"); - } else { + } + else { config.setWriteDisposition("WRITE_TRUNCATE"); } @@ -294,7 +306,8 @@ public TableSchema createTableSchema() throws IOException ObjectMapper mapper = new ObjectMapper(); List fields = mapper.readValue(stream, new TypeReference>() {}); return new TableSchema().setFields(fields); - } finally { + } + finally { if (stream != null) { stream.close(); } @@ -306,7 +319,8 @@ public boolean isExistTable(String project, String dataset, String table) throws Tables tableRequest = bigQueryClient.tables(); try { Table tableData = tableRequest.get(project, dataset, table).execute(); - } catch (GoogleJsonResponseException ex) { + } + catch (GoogleJsonResponseException ex) { return false; } return true; @@ -317,13 +331,15 @@ public void checkConfig(String project, String dataset, String table) throws IOE if (autoCreateTable) { if (!schemaPath.isPresent()) { throw new FileNotFoundException("schema_file is empty"); - } else { + } + else { File file = new File(schemaPath.orNull()); if (!file.exists()) { throw new FileNotFoundException("Can not load schema file."); } } - } else { + } + else { if (!isExistTable(project, dataset, table)) { throw new IOException(String.format("table [%s] is not exists", table)); } @@ -347,7 +363,8 @@ private String getLocalMd5hash(String filePath) throws NoSuchAlgorithmException, byte[] encoded = (hashedBytes); return new String(encoded); - } finally { + } + finally { stream.close(); } } @@ -490,7 +507,8 @@ public BigqueryWriter build() throws IOException, GeneralSecurityException public class JobFailedException extends RuntimeException { - public JobFailedException(String message) { + public JobFailedException(String message) + { super(message); } }