diff --git a/src/main/java/com/google/cloud/spanner/jdbc/JdbcConnection.java b/src/main/java/com/google/cloud/spanner/jdbc/JdbcConnection.java index 9b5d5e88e..26b381dd2 100644 --- a/src/main/java/com/google/cloud/spanner/jdbc/JdbcConnection.java +++ b/src/main/java/com/google/cloud/spanner/jdbc/JdbcConnection.java @@ -91,14 +91,14 @@ class JdbcConnection extends AbstractJdbcConnection { private final Metrics metrics; - private final Attributes openTelemetryAttributes; + private final Attributes openTelemetryMetricsAttributes; JdbcConnection(String connectionUrl, ConnectionOptions options) throws SQLException { super(connectionUrl, options); this.useLegacyIsValidCheck = useLegacyValidCheck(); OpenTelemetry openTelemetry = getSpanner().getOptions().getOpenTelemetry(); - this.openTelemetryAttributes = - createOpenTelemetryAttributes(getConnectionOptions().getDatabaseId()); + this.openTelemetryMetricsAttributes = + createOpenTelemetryAttributes(getConnectionOptions().getDatabaseId(), false); this.metrics = new Metrics(openTelemetry); } @@ -114,9 +114,13 @@ static boolean useLegacyValidCheck() { } @VisibleForTesting - static Attributes createOpenTelemetryAttributes(DatabaseId databaseId) { + static Attributes createOpenTelemetryAttributes( + DatabaseId databaseId, boolean includeConnectionId) { AttributesBuilder attributesBuilder = Attributes.builder(); - attributesBuilder.put("connection_id", UUID.randomUUID().toString()); + // A unique connection ID should only be included for tracing and not for metrics. + if (includeConnectionId) { + attributesBuilder.put("connection_id", UUID.randomUUID().toString()); + } attributesBuilder.put("database", databaseId.getDatabase()); attributesBuilder.put("instance_id", databaseId.getInstanceId().getInstance()); attributesBuilder.put("project_id", databaseId.getInstanceId().getProject()); @@ -124,7 +128,7 @@ static Attributes createOpenTelemetryAttributes(DatabaseId databaseId) { } public void recordClientLibLatencyMetric(long value) { - metrics.recordClientLibLatency(value, openTelemetryAttributes); + metrics.recordClientLibLatency(value, openTelemetryMetricsAttributes); } @Override diff --git a/src/test/java/com/google/cloud/spanner/jdbc/it/ITOpenTelemetryTest.java b/src/test/java/com/google/cloud/spanner/jdbc/it/ITOpenTelemetryTest.java index bbbbaf344..83cfab81c 100644 --- a/src/test/java/com/google/cloud/spanner/jdbc/it/ITOpenTelemetryTest.java +++ b/src/test/java/com/google/cloud/spanner/jdbc/it/ITOpenTelemetryTest.java @@ -51,8 +51,14 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.junit.AfterClass; @@ -125,21 +131,21 @@ public void setup() { } @Test - public void testGlobalOpenTelemetry() throws SQLException, IOException, InterruptedException { + public void testGlobalOpenTelemetry() throws Exception { GlobalOpenTelemetry.resetForTest(); GlobalOpenTelemetry.set(openTelemetry); Properties info = new Properties(); info.put("enableExtendedTracing", "true"); try (Connection connection = createConnection(env, database, info)) { testOpenTelemetry(connection); + testOpenTelemetryConcurrency(() -> createConnection(env, database, info)); } finally { GlobalOpenTelemetry.resetForTest(); } } @Test - public void testOpenTelemetryInProperties() - throws SQLException, IOException, InterruptedException { + public void testOpenTelemetryInProperties() throws Exception { // Make sure there is no Global OpenTelemetry. GlobalOpenTelemetry.resetForTest(); Properties info = new Properties(); @@ -148,6 +154,7 @@ public void testOpenTelemetryInProperties() try (Connection connection = createConnection(env, database, info)) { testOpenTelemetry(connection); } + testOpenTelemetryConcurrency(() -> createConnection(env, database, info)); } private void testOpenTelemetry(Connection connection) @@ -159,14 +166,14 @@ private void testOpenTelemetry(Connection connection) // Test executeQuery(String) try (ResultSet resultSet = statement.executeQuery(sql)) { - assertQueryResult(resultSet, sql, uuid); + assertQueryResult(resultSet, sql, uuid, true); } // Test execute(String) uuid = UUID.randomUUID(); sql = "select '" + uuid + "'"; assertTrue(statement.execute(sql)); - assertQueryResult(statement.getResultSet(), sql, uuid); + assertQueryResult(statement.getResultSet(), sql, uuid, true); // Test executeUpdate(String) uuid = UUID.randomUUID(); @@ -189,7 +196,7 @@ private void testOpenTelemetry(Connection connection) String sql = "select '" + uuid + "'"; try (PreparedStatement statement = connection.prepareStatement(sql)) { try (ResultSet resultSet = statement.executeQuery()) { - assertQueryResult(resultSet, sql, uuid); + assertQueryResult(resultSet, sql, uuid, true); } } @@ -197,7 +204,7 @@ private void testOpenTelemetry(Connection connection) sql = "select '" + uuid + "'"; try (PreparedStatement statement = connection.prepareStatement(sql)) { assertTrue(statement.execute()); - assertQueryResult(statement.getResultSet(), sql, uuid); + assertQueryResult(statement.getResultSet(), sql, uuid, true); } uuid = UUID.randomUUID(); @@ -217,14 +224,53 @@ private void testOpenTelemetry(Connection connection) } } - private void assertQueryResult(ResultSet resultSet, String sql, UUID uuid) + private interface ConnectionProducer { + Connection createConnection() throws SQLException; + } + + private void testOpenTelemetryConcurrency(ConnectionProducer connectionProducer) + throws Exception { + int numThreads = 16; + int numIterations = 1000; + ExecutorService executor = Executors.newFixedThreadPool(16); + List> futures = new ArrayList<>(numThreads); + for (int n = 0; n < numThreads; n++) { + futures.add( + executor.submit( + (Callable) + () -> { + try (Connection connection = connectionProducer.createConnection(); + Statement statement = connection.createStatement()) { + for (int i = 0; i < numIterations; i++) { + UUID uuid = UUID.randomUUID(); + String sql = "select '" + uuid + "'"; + + try (ResultSet resultSet = statement.executeQuery(sql)) { + assertQueryResult(resultSet, sql, uuid, false); + } + } + } + return null; + })); + } + executor.shutdown(); + assertTrue(executor.awaitTermination(600L, TimeUnit.SECONDS)); + for (Future future : futures) { + // Just verify that we did not get an exception. + future.get(); + } + } + + private void assertQueryResult(ResultSet resultSet, String sql, UUID uuid, boolean assertTrace) throws SQLException, IOException, InterruptedException { assertTrue(resultSet.next()); assertEquals(uuid.toString(), resultSet.getString(1)); assertFalse(resultSet.next()); flushOpenTelemetry(); - assertTrace(sql); + if (assertTrace) { + assertTrace(sql); + } } private void assertUpdateResult(long updateCount, String sql)