diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java index 13423cabbd1..894d0a3004c 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java @@ -158,32 +158,24 @@ public boolean deleteDagAction(DagAction dagAction) throws IOException { private DagAction getDagActionWithRetry(String flowGroup, String flowName, String flowExecutionId, FlowActionType flowActionType, ExponentialBackoff exponentialBackoff) throws IOException, SQLException { return dbStatementExecutor.withPreparedStatement(String.format(GET_STATEMENT, tableName), getStatement -> { - ResultSet rs = null; - try { - int i = 0; - getStatement.setString(++i, flowGroup); - getStatement.setString(++i, flowName); - getStatement.setString(++i, flowExecutionId); - getStatement.setString(++i, flowActionType.toString()); - rs = getStatement.executeQuery(); + int i = 0; + getStatement.setString(++i, flowGroup); + getStatement.setString(++i, flowName); + getStatement.setString(++i, flowExecutionId); + getStatement.setString(++i, flowActionType.toString()); + try (ResultSet rs = getStatement.executeQuery()) { if (rs.next()) { return new DagAction(rs.getString(1), rs.getString(2), rs.getString(3), FlowActionType.valueOf(rs.getString(4))); + } else if (exponentialBackoff.awaitNextRetryIfAvailable()) { + return getDagActionWithRetry(flowGroup, flowName, flowExecutionId, flowActionType, exponentialBackoff); } else { - if (exponentialBackoff.awaitNextRetryIfAvailable()) { - return getDagActionWithRetry(flowGroup, flowName, flowExecutionId, flowActionType, exponentialBackoff); - } else { - log.warn(String.format("Can not find dag action: %s with flowGroup: %s, flowName: %s, flowExecutionId: %s", - flowActionType, flowGroup, flowName, flowExecutionId)); - return null; - } + log.warn(String.format("Can not find dag action: %s with flowGroup: %s, flowName: %s, flowExecutionId: %s", + flowActionType, flowGroup, flowName, flowExecutionId)); + return null; } } catch (SQLException | InterruptedException e) { throw new IOException(String.format("Failure get %s from table %s", new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), tableName), e); - } finally { - if (rs != null) { - rs.close(); - } } }, true); } @@ -191,20 +183,14 @@ private DagAction getDagActionWithRetry(String flowGroup, String flowName, Strin @Override public Collection getDagActions() throws IOException { return dbStatementExecutor.withPreparedStatement(String.format(GET_ALL_STATEMENT, tableName), getAllStatement -> { - ResultSet rs = null; - try { - HashSet result = new HashSet<>(); - rs = getAllStatement.executeQuery(); + HashSet result = new HashSet<>(); + try (ResultSet rs = getAllStatement.executeQuery()) { while (rs.next()) { result.add(new DagAction(rs.getString(1), rs.getString(2), rs.getString(3), FlowActionType.valueOf(rs.getString(4)))); } return result; } catch (SQLException e) { throw new IOException(String.format("Failure get dag actions from table %s ", tableName), e); - } finally { - if (rs != null) { - rs.close(); - } } }, true); }