Skip to content

Commit

Permalink
Merge pull request #23 from embulk/fix-code-format
Browse files Browse the repository at this point in the history
Fixed code format that warned by checkstyle
  • Loading branch information
sakama committed Jan 28, 2016
2 parents e947f07 + 821eb8f commit 6e29bf9
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 42 deletions.
8 changes: 5 additions & 3 deletions src/main/java/org/embulk/output/BigqueryAuthentication.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ public BigqueryAuthentication(String authMethod, Optional<String> serviceAccount

if (authMethod.toLowerCase().equals("compute_engine")) {
this.credentials = getComputeCredential();
} else if(authMethod.toLowerCase().equals("json_key")) {
}
else if (authMethod.toLowerCase().equals("json_key")) {
this.credentials = getServiceAccountCredentialFromJsonFile();
} else {
}
else {
this.credentials = getServiceAccountCredential();
}
}
Expand Down Expand Up @@ -111,4 +113,4 @@ public Bigquery getBigqueryClient() throws GoogleJsonResponseException, IOExcept

return client;
}
}
}
73 changes: 52 additions & 21 deletions src/main/java/org/embulk/output/BigqueryOutputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public interface PluginTask
}

private final Logger log = Exec.getLogger(BigqueryOutputPlugin.class);
private final static String temporaryTableSuffix = Long.toString(System.currentTimeMillis());
private static final String temporaryTableSuffix = Long.toString(System.currentTimeMillis());
private static BigqueryWriter bigQueryWriter;

@Override
Expand All @@ -161,7 +161,8 @@ public ConfigDiff transaction(ConfigSource config, int taskCount,
}
try {
task.setP12Keyfile(Optional.of(LocalFile.of(task.getP12KeyfilePath().get())));
} catch (IOException ex) {
}
catch (IOException ex) {
throw Throwables.propagate(ex);
}
}
Expand All @@ -172,7 +173,8 @@ public ConfigDiff transaction(ConfigSource config, int taskCount,
}
try {
task.setSchemaFile(Optional.of(LocalFile.of(task.getSchemaPath().get())));
} catch (IOException ex) {
}
catch (IOException ex) {
throw Throwables.propagate(ex);
}
}
Expand All @@ -181,7 +183,8 @@ public ConfigDiff transaction(ConfigSource config, int taskCount,
if (!task.getJsonKeyfile().isPresent()) {
throw new ConfigException("If auth_method is json_key, you have to set json_keyfile");
}
} else if (task.getAuthMethod().getString().equals("private_key")) {
}
else if (task.getAuthMethod().getString().equals("private_key")) {
if (!task.getP12Keyfile().isPresent() || !task.getServiceAccountEmail().isPresent()) {
throw new ConfigException("If auth_method is private_key, you have to set both service_account_email and p12_keyfile");
}
Expand All @@ -200,7 +203,7 @@ public ConfigDiff transaction(ConfigSource config, int taskCount,
}

try {
bigQueryWriter = new BigqueryWriter.Builder (
bigQueryWriter = new BigqueryWriter.Builder(
task.getAuthMethod().getString(),
task.getServiceAccountEmail(),
task.getP12Keyfile().transform(localFileToPathString()),
Expand All @@ -222,7 +225,8 @@ public ConfigDiff transaction(ConfigSource config, int taskCount,

bigQueryWriter.checkConfig(task.getProject(), task.getDataset(), task.getTable());

} catch (IOException | GeneralSecurityException ex) {
}
catch (IOException | GeneralSecurityException ex) {
throw new ConfigException(ex);
}
// non-retryable (non-idempotent) output:
Expand All @@ -242,7 +246,8 @@ public ConfigDiff resume(TaskSource taskSource,
if (mode == Mode.delete_in_advance) {
try {
bigQueryWriter.deleteTable(project, dataset, generateTableName(tableName));
} catch (IOException ex) {
}
catch (IOException ex) {
log.warn(ex.getMessage());
}
}
Expand All @@ -255,13 +260,16 @@ public ConfigDiff resume(TaskSource taskSource,
bigQueryWriter.replaceTable(project, dataset, generateTableName(tableName) + "_old", generateTableName(tableName));
}
bigQueryWriter.replaceTable(project, dataset, generateTableName(tableName), generateTemporaryTableName(tableName));
} catch (TimeoutException | BigqueryWriter.JobFailedException | IOException ex) {
}
catch (TimeoutException | BigqueryWriter.JobFailedException | IOException ex) {
log.error(ex.getMessage());
throw Throwables.propagate(ex);
} finally {
}
finally {
try {
bigQueryWriter.deleteTable(project, dataset, generateTemporaryTableName(tableName));
} catch (IOException ex) {
}
catch (IOException ex) {
log.warn(ex.getMessage());
}
}
Expand Down Expand Up @@ -328,7 +336,8 @@ public void nextFile()
}
log.info(String.format("Writing file [%s]", filePath));
output = new BufferedOutputStream(new FileOutputStream(filePath));
} catch (FileNotFoundException ex) {
}
catch (FileNotFoundException ex) {
throw Throwables.propagate(ex);
}
fileIndex++;
Expand All @@ -339,7 +348,8 @@ private void closeFile()
if (output != null) {
try {
output.close();
} catch (IOException ex) {
}
catch (IOException ex) {
throw Throwables.propagate(ex);
}
}
Expand All @@ -349,9 +359,11 @@ public void add(Buffer buffer)
{
try {
output.write(buffer.array(), buffer.offset(), buffer.limit());
} catch (IOException ex) {
}
catch (IOException ex) {
throw Throwables.propagate(ex);
} finally {
}
finally {
buffer.release();
}
}
Expand All @@ -367,7 +379,8 @@ public void finish()
log.info(String.format("Delete local file [%s]", filePath));
file.delete();
}
} catch (NoSuchAlgorithmException | TimeoutException | BigqueryWriter.JobFailedException | IOException ex) {
}
catch (NoSuchAlgorithmException | TimeoutException | BigqueryWriter.JobFailedException | IOException ex) {
log.error(ex.getMessage());
throw Throwables.propagate(ex);
}
Expand Down Expand Up @@ -447,15 +460,24 @@ public enum Mode
append("append"),
delete_in_advance("delete_in_advance") {
@Override
public boolean isDeleteInAdvance() { return true; }
public boolean isDeleteInAdvance()
{
return true;
}
},
replace("replace") {
@Override
public boolean isReplaceMode() { return true; }
public boolean isReplaceMode()
{
return true;
}
},
replace_backup("replace_backup") {
@Override
public boolean isReplaceMode() { return true; }
public boolean isReplaceMode()
{
return true;
}
};

private final String string;
Expand All @@ -465,8 +487,17 @@ public enum Mode
this.string = string;
}

public String getString() { return string; }
public boolean isReplaceMode() { return false; }
public boolean isDeleteInAdvance() { return true; }
public String getString()
{
return string;
}
public boolean isReplaceMode()
{
return false;
}
public boolean isDeleteInAdvance()
{
return true;
}
}
}
54 changes: 36 additions & 18 deletions src/main/java/org/embulk/output/BigqueryWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public BigqueryWriter(Builder builder)

if (autoCreateTable) {
this.tableSchema = createTableSchema();
} else {
}
else {
this.tableSchema = null;
}
}
Expand Down Expand Up @@ -100,7 +101,8 @@ private String getJobStatus(String project, JobReference jobRef) throws JobFaile
log.info(String.format("Job statistics [%s]", statistics.getLoad()));
}
return jobStatus;
} catch (IOException ex) {
}
catch (IOException ex) {
log.warn(ex.getMessage());
return "UNKNOWN";
}
Expand All @@ -118,14 +120,17 @@ private void getJobStatusUntilDone(String project, JobReference jobRef) throws T
if (jobStatus.equals("DONE")) {
log.info(String.format("Job completed successfully. job id:[%s] elapsed_time:%dms status:[%s]", jobRef.getJobId(), elapsedTime, "SUCCESS"));
break;
} else if (elapsedTime > jobStatusMaxPollingTime * 1000) {
}
else if (elapsedTime > jobStatusMaxPollingTime * 1000) {
throw new TimeoutException(String.format("Checking job status...Timeout. job id:[%s] elapsed_time:%dms status:[%s]", jobRef.getJobId(), elapsedTime, "TIMEOUT"));
} else {
}
else {
log.info(String.format("Checking job status... job id:[%s] elapsed_time:%dms status:[%s]", jobRef.getJobId(), elapsedTime, jobStatus));
}
Thread.sleep(jobStatusPollingInterval * 1000);
}
} catch (InterruptedException ex) {
}
catch (InterruptedException ex) {
log.warn(ex.getMessage());
}
}
Expand Down Expand Up @@ -171,13 +176,15 @@ public void executeLoad(String project, String dataset, String table, String loc

try {
jobRef = insert.execute().getJobReference();
} catch (IllegalStateException ex) {
}
catch (IllegalStateException ex) {
throw new JobFailedException(ex.getMessage());
}
log.info(String.format("Job executed. job id:[%s] file:[%s]", jobRef.getJobId(), localFilePath));
if (isSkipJobResultCheck) {
log.info(String.format("Skip job status check. job id:[%s]", jobRef.getJobId()));
} else {
}
else {
getJobStatusUntilDone(project, jobRef);
}
}
Expand All @@ -203,19 +210,22 @@ public void copyTable(String project, String dataset, String fromTable, String t

try {
jobRef = insert.execute().getJobReference();
} catch (IllegalStateException ex) {
}
catch (IllegalStateException ex) {
throw new JobFailedException(ex.getMessage());
}
log.info(String.format("Job executed. job id:[%s]", jobRef.getJobId()));
getJobStatusUntilDone(project, jobRef);
}

public void deleteTable(String project, String dataset, String table) throws IOException {
public void deleteTable(String project, String dataset, String table) throws IOException
{
try {
Tables.Delete delete = bigQueryClient.tables().delete(project, dataset, table);
delete.execute();
log.info(String.format("Table deleted. project:%s dataset:%s table:%s", delete.getProjectId(), delete.getDatasetId(), delete.getTableId()));
} catch (GoogleJsonResponseException ex) {
}
catch (GoogleJsonResponseException ex) {
log.warn(ex.getMessage());
}
}
Expand All @@ -238,7 +248,8 @@ private JobConfigurationLoad setLoadConfig(String project, String dataset, Strin
config.setSchema(tableSchema);
config.setCreateDisposition("CREATE_IF_NEEDED");
log.info(String.format("table:[%s] will be create if not exists", table));
} else {
}
else {
config.setCreateDisposition("CREATE_NEVER");
}
return config;
Expand All @@ -252,7 +263,8 @@ private JobConfigurationTableCopy setCopyConfig(String project, String dataset,

if (append) {
config.setWriteDisposition("WRITE_APPEND");
} else {
}
else {
config.setWriteDisposition("WRITE_TRUNCATE");
}

Expand Down Expand Up @@ -294,7 +306,8 @@ public TableSchema createTableSchema() throws IOException
ObjectMapper mapper = new ObjectMapper();
List<TableFieldSchema> fields = mapper.readValue(stream, new TypeReference<List<TableFieldSchema>>() {});
return new TableSchema().setFields(fields);
} finally {
}
finally {
if (stream != null) {
stream.close();
}
Expand All @@ -306,7 +319,8 @@ public boolean isExistTable(String project, String dataset, String table) throws
Tables tableRequest = bigQueryClient.tables();
try {
Table tableData = tableRequest.get(project, dataset, table).execute();
} catch (GoogleJsonResponseException ex) {
}
catch (GoogleJsonResponseException ex) {
return false;
}
return true;
Expand All @@ -317,13 +331,15 @@ public void checkConfig(String project, String dataset, String table) throws IOE
if (autoCreateTable) {
if (!schemaPath.isPresent()) {
throw new FileNotFoundException("schema_file is empty");
} else {
}
else {
File file = new File(schemaPath.orNull());
if (!file.exists()) {
throw new FileNotFoundException("Can not load schema file.");
}
}
} else {
}
else {
if (!isExistTable(project, dataset, table)) {
throw new IOException(String.format("table [%s] is not exists", table));
}
Expand All @@ -347,7 +363,8 @@ private String getLocalMd5hash(String filePath) throws NoSuchAlgorithmException,

byte[] encoded = (hashedBytes);
return new String(encoded);
} finally {
}
finally {
stream.close();
}
}
Expand Down Expand Up @@ -490,7 +507,8 @@ public BigqueryWriter build() throws IOException, GeneralSecurityException

public class JobFailedException extends RuntimeException
{
public JobFailedException(String message) {
public JobFailedException(String message)
{
super(message);
}
}
Expand Down

0 comments on commit 6e29bf9

Please sign in to comment.