Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed code format that warned by checkstyle #23

Merged
merged 1 commit into from
Jan 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/main/java/org/embulk/output/BigqueryAuthentication.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ public BigqueryAuthentication(String authMethod, Optional<String> serviceAccount

if (authMethod.toLowerCase().equals("compute_engine")) {
this.credentials = getComputeCredential();
} else if(authMethod.toLowerCase().equals("json_key")) {
}
else if (authMethod.toLowerCase().equals("json_key")) {
this.credentials = getServiceAccountCredentialFromJsonFile();
} else {
}
else {
this.credentials = getServiceAccountCredential();
}
}
Expand Down Expand Up @@ -111,4 +113,4 @@ public Bigquery getBigqueryClient() throws GoogleJsonResponseException, IOExcept

return client;
}
}
}
73 changes: 52 additions & 21 deletions src/main/java/org/embulk/output/BigqueryOutputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public interface PluginTask
}

private final Logger log = Exec.getLogger(BigqueryOutputPlugin.class);
private final static String temporaryTableSuffix = Long.toString(System.currentTimeMillis());
private static final String temporaryTableSuffix = Long.toString(System.currentTimeMillis());
private static BigqueryWriter bigQueryWriter;

@Override
Expand All @@ -161,7 +161,8 @@ public ConfigDiff transaction(ConfigSource config, int taskCount,
}
try {
task.setP12Keyfile(Optional.of(LocalFile.of(task.getP12KeyfilePath().get())));
} catch (IOException ex) {
}
catch (IOException ex) {
throw Throwables.propagate(ex);
}
}
Expand All @@ -172,7 +173,8 @@ public ConfigDiff transaction(ConfigSource config, int taskCount,
}
try {
task.setSchemaFile(Optional.of(LocalFile.of(task.getSchemaPath().get())));
} catch (IOException ex) {
}
catch (IOException ex) {
throw Throwables.propagate(ex);
}
}
Expand All @@ -181,7 +183,8 @@ public ConfigDiff transaction(ConfigSource config, int taskCount,
if (!task.getJsonKeyfile().isPresent()) {
throw new ConfigException("If auth_method is json_key, you have to set json_keyfile");
}
} else if (task.getAuthMethod().getString().equals("private_key")) {
}
else if (task.getAuthMethod().getString().equals("private_key")) {
if (!task.getP12Keyfile().isPresent() || !task.getServiceAccountEmail().isPresent()) {
throw new ConfigException("If auth_method is private_key, you have to set both service_account_email and p12_keyfile");
}
Expand All @@ -200,7 +203,7 @@ public ConfigDiff transaction(ConfigSource config, int taskCount,
}

try {
bigQueryWriter = new BigqueryWriter.Builder (
bigQueryWriter = new BigqueryWriter.Builder(
task.getAuthMethod().getString(),
task.getServiceAccountEmail(),
task.getP12Keyfile().transform(localFileToPathString()),
Expand All @@ -222,7 +225,8 @@ public ConfigDiff transaction(ConfigSource config, int taskCount,

bigQueryWriter.checkConfig(task.getProject(), task.getDataset(), task.getTable());

} catch (IOException | GeneralSecurityException ex) {
}
catch (IOException | GeneralSecurityException ex) {
throw new ConfigException(ex);
}
// non-retryable (non-idempotent) output:
Expand All @@ -242,7 +246,8 @@ public ConfigDiff resume(TaskSource taskSource,
if (mode == Mode.delete_in_advance) {
try {
bigQueryWriter.deleteTable(project, dataset, generateTableName(tableName));
} catch (IOException ex) {
}
catch (IOException ex) {
log.warn(ex.getMessage());
}
}
Expand All @@ -255,13 +260,16 @@ public ConfigDiff resume(TaskSource taskSource,
bigQueryWriter.replaceTable(project, dataset, generateTableName(tableName) + "_old", generateTableName(tableName));
}
bigQueryWriter.replaceTable(project, dataset, generateTableName(tableName), generateTemporaryTableName(tableName));
} catch (TimeoutException | BigqueryWriter.JobFailedException | IOException ex) {
}
catch (TimeoutException | BigqueryWriter.JobFailedException | IOException ex) {
log.error(ex.getMessage());
throw Throwables.propagate(ex);
} finally {
}
finally {
try {
bigQueryWriter.deleteTable(project, dataset, generateTemporaryTableName(tableName));
} catch (IOException ex) {
}
catch (IOException ex) {
log.warn(ex.getMessage());
}
}
Expand Down Expand Up @@ -328,7 +336,8 @@ public void nextFile()
}
log.info(String.format("Writing file [%s]", filePath));
output = new BufferedOutputStream(new FileOutputStream(filePath));
} catch (FileNotFoundException ex) {
}
catch (FileNotFoundException ex) {
throw Throwables.propagate(ex);
}
fileIndex++;
Expand All @@ -339,7 +348,8 @@ private void closeFile()
if (output != null) {
try {
output.close();
} catch (IOException ex) {
}
catch (IOException ex) {
throw Throwables.propagate(ex);
}
}
Expand All @@ -349,9 +359,11 @@ public void add(Buffer buffer)
{
try {
output.write(buffer.array(), buffer.offset(), buffer.limit());
} catch (IOException ex) {
}
catch (IOException ex) {
throw Throwables.propagate(ex);
} finally {
}
finally {
buffer.release();
}
}
Expand All @@ -367,7 +379,8 @@ public void finish()
log.info(String.format("Delete local file [%s]", filePath));
file.delete();
}
} catch (NoSuchAlgorithmException | TimeoutException | BigqueryWriter.JobFailedException | IOException ex) {
}
catch (NoSuchAlgorithmException | TimeoutException | BigqueryWriter.JobFailedException | IOException ex) {
log.error(ex.getMessage());
throw Throwables.propagate(ex);
}
Expand Down Expand Up @@ -447,15 +460,24 @@ public enum Mode
append("append"),
delete_in_advance("delete_in_advance") {
@Override
public boolean isDeleteInAdvance() { return true; }
public boolean isDeleteInAdvance()
{
return true;
}
},
replace("replace") {
@Override
public boolean isReplaceMode() { return true; }
public boolean isReplaceMode()
{
return true;
}
},
replace_backup("replace_backup") {
@Override
public boolean isReplaceMode() { return true; }
public boolean isReplaceMode()
{
return true;
}
};

private final String string;
Expand All @@ -465,8 +487,17 @@ public enum Mode
this.string = string;
}

public String getString() { return string; }
public boolean isReplaceMode() { return false; }
public boolean isDeleteInAdvance() { return true; }
public String getString()
{
return string;
}
public boolean isReplaceMode()
{
return false;
}
public boolean isDeleteInAdvance()
{
return true;
}
}
}
54 changes: 36 additions & 18 deletions src/main/java/org/embulk/output/BigqueryWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public BigqueryWriter(Builder builder)

if (autoCreateTable) {
this.tableSchema = createTableSchema();
} else {
}
else {
this.tableSchema = null;
}
}
Expand Down Expand Up @@ -100,7 +101,8 @@ private String getJobStatus(String project, JobReference jobRef) throws JobFaile
log.info(String.format("Job statistics [%s]", statistics.getLoad()));
}
return jobStatus;
} catch (IOException ex) {
}
catch (IOException ex) {
log.warn(ex.getMessage());
return "UNKNOWN";
}
Expand All @@ -118,14 +120,17 @@ private void getJobStatusUntilDone(String project, JobReference jobRef) throws T
if (jobStatus.equals("DONE")) {
log.info(String.format("Job completed successfully. job id:[%s] elapsed_time:%dms status:[%s]", jobRef.getJobId(), elapsedTime, "SUCCESS"));
break;
} else if (elapsedTime > jobStatusMaxPollingTime * 1000) {
}
else if (elapsedTime > jobStatusMaxPollingTime * 1000) {
throw new TimeoutException(String.format("Checking job status...Timeout. job id:[%s] elapsed_time:%dms status:[%s]", jobRef.getJobId(), elapsedTime, "TIMEOUT"));
} else {
}
else {
log.info(String.format("Checking job status... job id:[%s] elapsed_time:%dms status:[%s]", jobRef.getJobId(), elapsedTime, jobStatus));
}
Thread.sleep(jobStatusPollingInterval * 1000);
}
} catch (InterruptedException ex) {
}
catch (InterruptedException ex) {
log.warn(ex.getMessage());
}
}
Expand Down Expand Up @@ -171,13 +176,15 @@ public void executeLoad(String project, String dataset, String table, String loc

try {
jobRef = insert.execute().getJobReference();
} catch (IllegalStateException ex) {
}
catch (IllegalStateException ex) {
throw new JobFailedException(ex.getMessage());
}
log.info(String.format("Job executed. job id:[%s] file:[%s]", jobRef.getJobId(), localFilePath));
if (isSkipJobResultCheck) {
log.info(String.format("Skip job status check. job id:[%s]", jobRef.getJobId()));
} else {
}
else {
getJobStatusUntilDone(project, jobRef);
}
}
Expand All @@ -203,19 +210,22 @@ public void copyTable(String project, String dataset, String fromTable, String t

try {
jobRef = insert.execute().getJobReference();
} catch (IllegalStateException ex) {
}
catch (IllegalStateException ex) {
throw new JobFailedException(ex.getMessage());
}
log.info(String.format("Job executed. job id:[%s]", jobRef.getJobId()));
getJobStatusUntilDone(project, jobRef);
}

public void deleteTable(String project, String dataset, String table) throws IOException {
public void deleteTable(String project, String dataset, String table) throws IOException
{
try {
Tables.Delete delete = bigQueryClient.tables().delete(project, dataset, table);
delete.execute();
log.info(String.format("Table deleted. project:%s dataset:%s table:%s", delete.getProjectId(), delete.getDatasetId(), delete.getTableId()));
} catch (GoogleJsonResponseException ex) {
}
catch (GoogleJsonResponseException ex) {
log.warn(ex.getMessage());
}
}
Expand All @@ -238,7 +248,8 @@ private JobConfigurationLoad setLoadConfig(String project, String dataset, Strin
config.setSchema(tableSchema);
config.setCreateDisposition("CREATE_IF_NEEDED");
log.info(String.format("table:[%s] will be create if not exists", table));
} else {
}
else {
config.setCreateDisposition("CREATE_NEVER");
}
return config;
Expand All @@ -252,7 +263,8 @@ private JobConfigurationTableCopy setCopyConfig(String project, String dataset,

if (append) {
config.setWriteDisposition("WRITE_APPEND");
} else {
}
else {
config.setWriteDisposition("WRITE_TRUNCATE");
}

Expand Down Expand Up @@ -294,7 +306,8 @@ public TableSchema createTableSchema() throws IOException
ObjectMapper mapper = new ObjectMapper();
List<TableFieldSchema> fields = mapper.readValue(stream, new TypeReference<List<TableFieldSchema>>() {});
return new TableSchema().setFields(fields);
} finally {
}
finally {
if (stream != null) {
stream.close();
}
Expand All @@ -306,7 +319,8 @@ public boolean isExistTable(String project, String dataset, String table) throws
Tables tableRequest = bigQueryClient.tables();
try {
Table tableData = tableRequest.get(project, dataset, table).execute();
} catch (GoogleJsonResponseException ex) {
}
catch (GoogleJsonResponseException ex) {
return false;
}
return true;
Expand All @@ -317,13 +331,15 @@ public void checkConfig(String project, String dataset, String table) throws IOE
if (autoCreateTable) {
if (!schemaPath.isPresent()) {
throw new FileNotFoundException("schema_file is empty");
} else {
}
else {
File file = new File(schemaPath.orNull());
if (!file.exists()) {
throw new FileNotFoundException("Can not load schema file.");
}
}
} else {
}
else {
if (!isExistTable(project, dataset, table)) {
throw new IOException(String.format("table [%s] is not exists", table));
}
Expand All @@ -347,7 +363,8 @@ private String getLocalMd5hash(String filePath) throws NoSuchAlgorithmException,

byte[] encoded = (hashedBytes);
return new String(encoded);
} finally {
}
finally {
stream.close();
}
}
Expand Down Expand Up @@ -490,7 +507,8 @@ public BigqueryWriter build() throws IOException, GeneralSecurityException

public class JobFailedException extends RuntimeException
{
public JobFailedException(String message) {
public JobFailedException(String message)
{
super(message);
}
}
Expand Down