Skip to content

Commit

Permalink
feat(sql): Add JSON support
Browse files Browse the repository at this point in the history
  • Loading branch information
ztefanie committed Feb 10, 2025
1 parent beddcbf commit b3eaa4c
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 37 deletions.
5 changes: 5 additions & 0 deletions connectors/jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
<artifactId>jdbi3-core</artifactId>
<version>3.47.0</version>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi3-jackson2</artifactId>
<version>3.47.0</version>
</dependency>

<dependency>
<groupId>io.camunda.connector</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import java.util.Map;
import java.util.Objects;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.core.statement.Query;
import org.jdbi.v3.core.statement.SqlStatement;
import org.jdbi.v3.jackson2.Jackson2Plugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,12 +42,12 @@ JdbcResponse internalExecuteRequest(JdbcRequestData data, Connection connection)
throws SQLException, IllegalAccessException {
JdbcResponse response;
Jdbi jdbi = Jdbi.create(connection);
jdbi.installPlugin(new Jackson2Plugin());
if (data.returnResults()) {
// SELECT query, or RETURNING clause
LOG.debug("Executing query: {}", data.query());
List<Map<String, Object>> result =
jdbi.withHandle(
handle -> bindVariables(handle.createQuery(data.query()), data).mapToMap().list());
Query q = jdbi.withHandle(handle -> bindVariables(handle.createQuery(data.query()), data));
List<Map<String, Object>> result = JdbiJsonHelper.mapToParsedMap(q).list();
response = JdbcResponse.of(result);
LOG.debug("JdbcResponse: {}", response);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.camunda.connector.jdbc.model.client;

import com.fasterxml.jackson.databind.JsonNode;
import java.sql.Types;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.jdbi.v3.core.qualifier.QualifiedType;
import org.jdbi.v3.core.result.ResultIterable;
import org.jdbi.v3.core.result.UnableToProduceResultException;
import org.jdbi.v3.core.statement.Query;
import org.jdbi.v3.json.Json;

public class JdbiJsonHelper {

public static ResultIterable<Map<String, Object>> mapToParsedMap(Query query) {
return query.map(
(rs, ctx) -> {
Map<String, Object> row = new HashMap<>();
for (int i = 1; i <= rs.getMetaData().getColumnCount(); i++) {
String columnName = rs.getMetaData().getColumnLabel(i);
Object value = rs.getObject(i);
if (isPotentiallyJsonColumn(rs.getMetaData().getColumnTypeName(i), value)) {
try {
value =
ctx.findColumnMapperFor(QualifiedType.of(JsonNode.class).with(Json.class))
.orElseThrow()
.map(rs, i, ctx);
} catch (UnableToProduceResultException ignored) {
row.put(columnName, value);
}
}
row.put(columnName, value);
}
return row;
});
}

private static boolean isPotentiallyJsonColumn(int columnType) {
Set<Integer> jsonCompatibleTypes =
Set.of(
Types.VARCHAR,
Types.LONGVARCHAR,
Types.NCHAR,
Types.NVARCHAR,
Types.NCLOB,
Types.BLOB,
Types.CLOB,
Types.JAVA_OBJECT,
Types.OTHER);
return jsonCompatibleTypes.contains(columnType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.camunda.connector.jdbc.model.client.JdbcClient;
import io.camunda.connector.jdbc.model.client.JdbiJdbcClient;
import io.camunda.connector.jdbc.model.client.JdbiJsonHelper;
import io.camunda.connector.jdbc.model.request.JdbcRequest;
import io.camunda.connector.jdbc.model.request.JdbcRequestData;
import io.camunda.connector.jdbc.model.request.SupportedDatabase;
Expand All @@ -29,19 +34,49 @@
import java.util.stream.Collectors;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.core.result.NoResultsException;
import org.jdbi.v3.core.statement.Query;
import org.jdbi.v3.core.statement.UnableToCreateStatementException;
import org.jdbi.v3.jackson2.Jackson2Plugin;

public abstract class IntegrationBaseTest {
static final Employee NEW_EMPLOYEE = new Employee(7, "Eve", 55, "HR");

static final String DEFAULT_ADDRESS_JSON =
"{\"street\":\"123 Main St\",\"city\":\"New York\",\"zip\":\"10001\"}";
static final Employee NEW_EMPLOYEE = new Employee(7, "Eve", 55, "HR", DEFAULT_ADDRESS_JSON);

static final List<Employee> DEFAULT_EMPLOYEES =
List.of(
new Employee(1, "John Doe", 30, "IT"),
new Employee(2, "Jane Doe", 25, "HR"),
new Employee(3, "Alice", 35, "Finance"),
new Employee(4, "Bob", 40, "IT"),
new Employee(5, "Charlie", 45, "HR"),
new Employee(6, "David", 50, "Finance"));
new Employee(1, "John Doe", 30, "IT", DEFAULT_ADDRESS_JSON),
new Employee(
2,
"Jane Doe",
25,
"HR",
"{\"street\":\"456 Elm St\",\"city\":\"Los Angeles\",\"zip\":\"90001\"}"),
new Employee(
3,
"Alice",
35,
"Finance",
"{\"street\":\"789 Oak St\",\"city\":\"Chicago\",\"zip\":\"60601\"}"),
new Employee(
4,
"Bob",
40,
"IT",
"{\"street\":\"101 Pine St\",\"city\": \"San Francisco\",\"zip\":\"94101\"}"),
new Employee(
5,
"Charlie",
45,
"HR",
"{\"street\":\"202 Maple St\",\"city\":\"Boston\", \"zip\":\"02101\"}"),
new Employee(
6,
"David",
50,
"Finance",
"{\"street\":\"303 Cedar St\",\"city\":\"Seattle\",\"zip\":\"98101\"}"));

private final JdbcClient jdbiJdbcClient = new JdbiJdbcClient();

Expand All @@ -56,19 +91,55 @@ static void createEmployeeTable(IntegrationTestConfig config) throws SQLExceptio
}
}

void addJsonColumn(IntegrationTestConfig config, String jsonDatabaseType) throws SQLException {
try (Connection conn =
DriverManager.getConnection(config.url(), config.username(), config.password());
Statement stmt = conn.createStatement()) {

if (config.databaseName() != null) {
stmt.executeUpdate("USE " + config.databaseName());
}

String addColumnSQL = "ALTER TABLE Employee ADD json " + jsonDatabaseType;
stmt.executeUpdate(addColumnSQL);
String dummyJson;
switch (config.database()) {
case MYSQL, MARIADB -> dummyJson = "'" + DEFAULT_ADDRESS_JSON + "'";
case POSTGRESQL -> dummyJson = "'" + DEFAULT_ADDRESS_JSON + "'::json";
case MSSQL -> dummyJson = "'" + DEFAULT_ADDRESS_JSON + "'";
default ->
throw new UnsupportedOperationException("Unsupported database: " + config.database());
}
String updateSQL = "UPDATE Employee SET json = " + dummyJson;
stmt.executeUpdate(updateSQL);
}
}

void dropJsonColumn(IntegrationTestConfig config) throws SQLException {
try (Connection conn =
DriverManager.getConnection(config.url(), config.username(), config.password());
Statement stmt = conn.createStatement()) {

if (config.databaseName() != null) {
stmt.executeUpdate("USE " + config.databaseName());
}
stmt.executeUpdate("ALTER TABLE Employee DROP COLUMN json");
}
}

List<Map<String, Object>> selectAll(IntegrationTestConfig config, String tableName)
throws SQLException {
try (Connection conn =
DriverManager.getConnection(config.url(), config.username(), config.password())) {
// using jdbi
try (var handle = Jdbi.create(conn).open()) {
var jdbi = Jdbi.create(conn);
jdbi.installPlugin(new Jackson2Plugin());
try (var handle = jdbi.open()) {
if (config.databaseName() != null) {
handle.execute("USE " + config.databaseName());
}
return handle
.createQuery("SELECT * FROM " + tableName + " ORDER BY id ASC")
.mapToMap()
.list();
Query q = handle.createQuery("SELECT * FROM " + tableName + " ORDER BY id ASC");
return JdbiJsonHelper.mapToParsedMap(q).list();
}
}
}
Expand Down Expand Up @@ -122,6 +193,9 @@ void selectDataAndAssertSuccess(IntegrationTestConfig config) {
var row = response.resultSet().get(i);
var employee = DEFAULT_EMPLOYEES.get(i);
assertEquals(employee.toMap(), row);
assertEquals(
((ObjectNode) employee.toMap().get("address")).get("street").asText(),
((ObjectNode) row.get("address")).get("street").asText());
}
}

Expand Down Expand Up @@ -272,12 +346,12 @@ void selectDataWithPositionalParametersWhereInAndAssertSuccess(IntegrationTestCo
assertNull(response.modifiedRows());
assertNotNull(response.resultSet());
assertEquals(2, response.resultSet().size());
assertEquals(
assertTrue(
DEFAULT_EMPLOYEES.stream()
.filter(e -> e.name().equals("John Doe") || e.name().equals("Jane Doe"))
.map(Employee::toMap)
.collect(Collectors.toList()),
response.resultSet());
.toList()
.containsAll(response.resultSet()));
}

void selectDataWithBindingParametersWhereInAndAssertSuccess(IntegrationTestConfig config) {
Expand Down Expand Up @@ -307,6 +381,26 @@ void selectDataWithBindingParametersWhereInAndAssertSuccess(IntegrationTestConfi
response.resultSet());
}

void selectJsonDataAndAssertSuccess(IntegrationTestConfig config) throws JsonProcessingException {
JdbcRequest request =
new JdbcRequest(
config.database(),
new DetailedConnection(
config.host(),
config.port(),
config.username(),
config.password(),
config.databaseName(),
config.properties()),
new JdbcRequestData(true, "SELECT * FROM Employee ORDER BY Id ASC"));
var response = jdbiJdbcClient.executeRequest(request);

var row = response.resultSet().get(0);
ObjectMapper objectMapper = new ObjectMapper();
var a = objectMapper.readTree(DEFAULT_ADDRESS_JSON);
assertEquals(a.get("street").asText(), ((ObjectNode) row.get("json")).get("street").asText());
}

void updateDataAndAssertSuccess(IntegrationTestConfig config) {
String name = DEFAULT_EMPLOYEES.get(0).name() + " UPDATED";
JdbcRequest request =
Expand Down Expand Up @@ -545,8 +639,8 @@ void insertDataWithNamedParametersAndAssertSuccess(IntegrationTestConfig config)
config.properties()),
new JdbcRequestData(
false,
"INSERT INTO Employee (id, name, age, department) VALUES (:id, :name, :age, :department)",
NEW_EMPLOYEE.toMap()));
"INSERT INTO Employee (id, name, age, department, address) VALUES (:id, :name, :age, :department, :address)",
NEW_EMPLOYEE.toUnparsedMap()));
var response = jdbiJdbcClient.executeRequest(request);
assertEquals(1, response.modifiedRows());
assertNull(response.resultSet());
Expand All @@ -565,12 +659,13 @@ void insertDataWithPositionalParametersAndAssertSuccess(IntegrationTestConfig co
config.properties()),
new JdbcRequestData(
false,
"INSERT INTO Employee (id, name, age, department) VALUES (?, ?, ?, ?)",
"INSERT INTO Employee (id, name, age, department, address) VALUES (?, ?, ?, ?, ?)",
List.of(
NEW_EMPLOYEE.id(),
NEW_EMPLOYEE.name(),
NEW_EMPLOYEE.age(),
NEW_EMPLOYEE.department())));
NEW_EMPLOYEE.department(),
NEW_EMPLOYEE.address())));
var response = jdbiJdbcClient.executeRequest(request);
assertEquals(1, response.modifiedRows());
assertNull(response.resultSet());
Expand All @@ -589,14 +684,15 @@ void insertDataWithBindingParametersAndAssertSuccess(IntegrationTestConfig confi
config.properties()),
new JdbcRequestData(
false,
"INSERT INTO Employee (id, name, age, department) VALUES (<params>)",
"INSERT INTO Employee (id, name, age, department, address) VALUES (<params>)",
Map.of(
"params",
List.of(
NEW_EMPLOYEE.id(),
NEW_EMPLOYEE.name(),
NEW_EMPLOYEE.age(),
NEW_EMPLOYEE.department()))));
NEW_EMPLOYEE.department(),
NEW_EMPLOYEE.address))));
var response = jdbiJdbcClient.executeRequest(request);
assertEquals(1, response.modifiedRows());
assertNull(response.resultSet());
Expand Down Expand Up @@ -688,24 +784,46 @@ void cleanUpDatabase(IntegrationTestConfig config, String databaseName) throws S
}
}

record Employee(int id, String name, int age, String department) {
record Employee(int id, String name, int age, String department, String address) {

static final String INSERT = "INSERT INTO Employee (id, name, age, department) VALUES %s";
static final String INSERT =
"INSERT INTO Employee (id, name, age, department, address) VALUES %s";
static final String CREATE_TABLE =
"""
CREATE TABLE Employee (
id INT PRIMARY KEY,
name VARCHAR(100),
age INT,
department VARCHAR(100));
department VARCHAR(100),
address VARCHAR(100));
""";

String toInsertQueryFormat() {
return String.format("(%d, '%s', %d, '%s')", id, name, age, department);
return String.format("(%d, '%s', %d, '%s', '%s')", id, name, age, department, address);
}

Map<String, Object> toMap() {
return Map.of("id", id, "name", name, "age", age, "department", department);
try {
ObjectMapper objectMapper = new ObjectMapper();
return Map.of(
"id",
id,
"name",
name,
"age",
age,
"department",
department,
"address",
objectMapper.readTree(address));
} catch (JsonProcessingException e) {
}
return null;
}

Map<String, Object> toUnparsedMap() {
return Map.of(
"id", id, "name", name, "age", age, "department", department, "address", address);
}
}
}
Loading

0 comments on commit b3eaa4c

Please sign in to comment.