Skip to content

Commit

Permalink
feat: FIR-43038 allow jdbc to send batch as a single multi statement (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
stepansergeevitch authored Feb 19, 2025
1 parent 3d6a82e commit 4750fe1
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 15 deletions.
48 changes: 48 additions & 0 deletions src/integrationTest/java/integration/tests/ConnectionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,54 @@ void validatesOnSystemEngineIfParameterProvided() throws SQLException {

}

@Test
@Tag("v2")
void preparedStatementBatchesWorkIfMergeParameterProvided() throws SQLException {
String engineName = integration.ConnectionInfo.getInstance().getEngine();
String queryLabel = "test_merge_batches_" + System.currentTimeMillis();
try (Connection connection = createConnection(engineName, Map.of("merge_prepared_statement_batches", "true"))) {
try (Statement statement = connection.createStatement()) {
statement.executeUpdate("CREATE TABLE test_table (id INT)");
try (java.sql.PreparedStatement preparedStatement = connection.prepareStatement(
String.format("/*%s*/INSERT INTO test_table VALUES (?)", queryLabel))) {
for (int i = 0; i < 10; i++) {
preparedStatement.setInt(1, i);
preparedStatement.addBatch();
}
preparedStatement.executeBatch();

}
try (ResultSet rs = statement.executeQuery("SELECT COUNT(*) FROM test_table")) {
assertTrue(rs.next());
assertEquals(10, rs.getInt(1));
}
// sleep for 10s to give QH time to get populated and avoid flakiness
// it sometime takes that long
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// ignore
}

// Validate we've only executed one insert
String qhQuery = "SELECT count(*) from information_schema.engine_query_history WHERE status='ENDED_SUCCESSFULLY' " +
String.format("AND lower(query_text) like '/*%s*/insert into %%'", queryLabel);
System.out.println(qhQuery);
try (java.sql.PreparedStatement preparedStatement = connection.prepareStatement(qhQuery)) {
try (ResultSet rs = preparedStatement.executeQuery()) {
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
}
}

} finally {
try (Statement statement = connection.createStatement()) {
statement.executeUpdate("DROP TABLE IF EXISTS test_table");
}
}
}
}

void unsuccessfulConnect(boolean useDatabase, boolean useEngine) throws SQLException {
ConnectionInfo params = integration.ConnectionInfo.getInstance();
String url = getJdbcUrl(params, useDatabase, useEngine);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ void shouldInsertAndSelectComplexStruct() throws SQLException {
.executeQuery("SELECT test_struct FROM test_struct")) {
rs.next();
assertEquals(FireboltDataType.STRUCT.name().toLowerCase()
+ "(id int, s struct(a array(text null), `b column` timestamp null))",
+ "(id int, s struct(a array(text null) null, `b column` timestamp null))",
rs.getMetaData().getColumnTypeName(1).toLowerCase());
String expectedJson = String.format(
"{\"id\":%d,\"s\":{\"a\":[\"%s\",\"%s\"],\"b column\":\"%s\"}}", 1, car1.getTags()[0],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
@CustomLog
class TimeoutTest extends IntegrationTest {
private static final int MIN_TIME_SECONDS = 350;
private static final Map<Integer, Long> SERIES_SIZE = Map.of(1, 80000000000L, 2, 700000000000L);
private static final Map<Integer, Long> SERIES_SIZE = Map.of(1, 80000000000L, 2, 400000000000L);
private long startTime;

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ SET prevent_create_on_information_schema=true;
SET enable_create_table_with_struct_type=true;
DROP TABLE IF EXISTS test_struct;
DROP TABLE IF EXISTS test_struct_helper;
CREATE TABLE IF NOT EXISTS test_struct(id int not null, s struct(a array(text) not null, "b column" datetime null) not null);
CREATE TABLE IF NOT EXISTS test_struct(id int not null, s struct(a array(text) null, "b column" datetime null) not null);
CREATE TABLE IF NOT EXISTS test_struct_helper(a array(text) not null, "b column" datetime null);
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class FireboltProperties {
private final String userClients;
private final String accessToken;
private final boolean validateOnSystemEngine;
private final boolean mergePreparedStatementBatches;
@Builder.Default
private Map<String, String> initialAdditionalProperties = new HashMap<>();
@Builder.Default
Expand Down Expand Up @@ -112,6 +113,7 @@ public FireboltProperties(Properties properties) {
userDrivers = getSetting(properties, FireboltSessionProperty.USER_DRIVERS);
userClients = getSetting(properties, FireboltSessionProperty.USER_CLIENTS);
validateOnSystemEngine = getSetting(properties, FireboltSessionProperty.VALIDATE_ON_SYSTEM_ENGINE);
mergePreparedStatementBatches = getSetting(properties, FireboltSessionProperty.MERGE_PREPARED_STATEMENT_BATCHES);

environment = getEnvironment(configuredEnvironment, properties);
host = getHost(configuredEnvironment, properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public enum FireboltSessionProperty {
VALIDATE_ON_SYSTEM_ENGINE("validate_on_system_engine", false, Boolean.class,
"Whether to validate the connection on the system engine or not. By default validates on an engine currently connected.",
FireboltProperties::isValidateOnSystemEngine),
MERGE_PREPARED_STATEMENT_BATCHES("merge_prepared_statement_batches", false, Boolean.class,
"Whether to send prepared statement batches as a single statement. By default, they are sent one by one.", FireboltProperties::isMergePreparedStatementBatches),
// We keep all the deprecated properties to ensure backward compatibility - but
// they do not have any effect.
@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
public class FireboltStatement extends JdbcBase implements Statement {

private final FireboltStatementService statementService;
private final FireboltProperties sessionProperties;
protected final FireboltProperties sessionProperties;
private final FireboltConnection connection;
private final Collection<String> statementsToExecuteLabels = new HashSet<>();
private boolean closeOnCompletion = false;
Expand Down Expand Up @@ -436,7 +436,7 @@ public int[] executeBatch() throws SQLException {
result.add(rs.map(x -> 0).orElse(SUCCESS_NO_INFO));
}
}
return result.stream().mapToInt(Integer::intValue).toArray();
return result.stream().mapToInt(Integer::intValue).toArray();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,18 +266,34 @@ public void setArray(int parameterIndex, Array x) throws SQLException {
public int[] executeBatch() throws SQLException {
validateStatementIsNotClosed();
log.debug("Executing batch for statement: {}", rawStatement);
List<StatementInfoWrapper> inserts = new ArrayList<>();
List<StatementInfoWrapper> statements = new ArrayList<>();
int[] result = new int[rows.size()];
for (Map<Integer, String> row : rows) {
inserts.addAll(prepareSQL(row));
statements.addAll(prepareSQL(row));
}
execute(inserts);
for (int i = 0; i < inserts.size(); i++) {
if (sessionProperties.isMergePreparedStatementBatches()) {
if (!statements.isEmpty()) {
execute(List.of(asSingleStatement(statements)));
}
} else {
execute(statements);
}
for (int i = 0; i < statements.size(); i++) {
result[i] = SUCCESS_NO_INFO;
}
return result;
}

private StatementInfoWrapper asSingleStatement(List<StatementInfoWrapper> queries) {
// merge all queries into a single query, separated by semicolons
StringBuilder sb = new StringBuilder();
var first = queries.get(0);
for (StatementInfoWrapper query : queries) {
sb.append(query.getSql()).append(";");
}
return new StatementInfoWrapper(sb.toString(), first.getInitialStatement().getStatementType(), first.getParam(), first.getInitialStatement());
}

@Override
@NotImplemented
public int executeUpdate(String sql) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

@ExtendWith(MockitoExtension.class)
abstract class FireboltConnectionTest {
Expand Down Expand Up @@ -373,6 +369,28 @@ void shouldForceSystemEngineWhenValidateOnSystemEngineIsSet() throws SQLExceptio
}
}

@Test
void shouldSendBatchesInSingleQueryWhenMergeBatchesIsSet() throws SQLException {
when(fireboltStatementService.execute(any(), any(), any()))
.thenReturn(Optional.empty());
Properties propertiesCopy = new Properties(connectionProperties);
propertiesCopy.put("merge_prepared_statement_batches", "true");
try (FireboltConnection fireboltConnection = createConnection(URL, propertiesCopy)) {
fireboltConnection.createStatement().execute("SET param=value");
PreparedStatement statement = fireboltConnection.prepareStatement("INSERT INTO t VALUES (?)");
statement.setInt(1, 1);
statement.addBatch();
statement.setInt(1, 2);
statement.addBatch();
statement.executeBatch();
verify(fireboltStatementService, atLeast(2)).execute(queryInfoWrapperArgumentCaptor.capture(),
propertiesArgumentCaptor.capture(), any());
assertEquals("INSERT INTO t VALUES (1);INSERT INTO t VALUES (2);", queryInfoWrapperArgumentCaptor.getValue().getSql());
// Validate that parameters are preserved
assertEquals(Map.of("param", "value"), propertiesArgumentCaptor.getValue().getAdditionalProperties());
}
}

@Test
void shouldIgnore429WhenValidatingConnection() throws SQLException {
when(fireboltStatementService.execute(any(), any(), any()))
Expand Down Expand Up @@ -738,5 +756,16 @@ void shouldValidateOnUserEngineByDefault() throws SQLException {
}
}

@Test
void shouldSendBatchesSeparatelyByDefault() throws SQLException {
try (FireboltConnection fireboltConnection = createConnection(URL, connectionProperties)) {
assertEquals("false", fireboltConnection.getClientInfo().get("merge_prepared_statement_batches"));
}
connectionProperties.put("merge_prepared_statement_batches", "true");
try (FireboltConnection fireboltConnection = createConnection(URL, connectionProperties)) {
assertEquals("true", fireboltConnection.getClientInfo().get("merge_prepared_statement_batches"));
}
}

protected abstract FireboltConnection createConnection(String url, Properties props) throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ void shouldHaveAllTheSpecifiedCustomProperties() {
properties.put("someCustomProperties", "custom_value");
properties.put("compress", "1");
properties.put("validate_on_system_engine", "true");
properties.put("merge_prepared_statement_batches", "true");

Map<String, String> customProperties = new HashMap<>();
customProperties.put("someCustomProperties", "custom_value");
Expand All @@ -59,7 +60,7 @@ void shouldHaveAllTheSpecifiedCustomProperties() {
.initialAdditionalProperties(customProperties).keepAliveTimeoutMillis(300000)
.maxConnectionsTotal(300).maxRetries(3).socketTimeoutMillis(20).connectionTimeoutMillis(60000)
.tcpKeepInterval(30).tcpKeepIdle(60).tcpKeepCount(10).environment("app").validateOnSystemEngine(true)
.build();
.mergePreparedStatementBatches(true).build();
assertEquals(expectedDefaultProperties, new FireboltProperties(properties));
}

Expand Down

0 comments on commit 4750fe1

Please sign in to comment.