diff --git a/plugin/trino-prometheus/pom.xml b/plugin/trino-prometheus/pom.xml index 7a44709522d4..ab9fa7e76e36 100644 --- a/plugin/trino-prometheus/pom.xml +++ b/plugin/trino-prometheus/pom.xml @@ -166,6 +166,20 @@ + + io.trino + trino-main + test-jar + test + + + + commons-codec + commons-codec + + + + io.trino trino-testing diff --git a/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/PrometheusServer.java b/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/PrometheusServer.java index 703f02261ab0..bf22307340f6 100644 --- a/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/PrometheusServer.java +++ b/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/PrometheusServer.java @@ -20,7 +20,6 @@ import java.net.URI; import java.time.Duration; -import static io.trino.plugin.prometheus.PrometheusClient.METRICS_ENDPOINT; import static org.testcontainers.utility.MountableFile.forClasspathResource; public class PrometheusServer @@ -32,6 +31,7 @@ public class PrometheusServer public static final String USER = "admin"; public static final String PASSWORD = "password"; + public static final String PROMETHEUS_QUERY_API = "/api/v1/query?query=up[1d]"; private final GenericContainer dockerContainer; @@ -44,14 +44,14 @@ public PrometheusServer(String version, boolean enableBasicAuth) { this.dockerContainer = new GenericContainer<>("prom/prometheus:" + version) .withExposedPorts(PROMETHEUS_PORT) - .waitingFor(Wait.forHttp(METRICS_ENDPOINT).forResponsePredicate(response -> response.contains("\"up\""))) - .withStartupTimeout(Duration.ofSeconds(120)); + .waitingFor(Wait.forHttp(PROMETHEUS_QUERY_API).forResponsePredicate(response -> response.contains("\"values\""))) + .withStartupTimeout(Duration.ofSeconds(360)); // Basic authentication was introduced in v2.24.0 if (enableBasicAuth) { this.dockerContainer .withCommand("--config.file=/etc/prometheus/prometheus.yml", "--web.config.file=/etc/prometheus/web.yml") .withCopyFileToContainer(forClasspathResource("web.yml"), "/etc/prometheus/web.yml") - .waitingFor(Wait.forHttp(METRICS_ENDPOINT).forResponsePredicate(response -> response.contains("\"up\"")).withBasicCredentials(USER, PASSWORD)) + .waitingFor(Wait.forHttp(PROMETHEUS_QUERY_API).forResponsePredicate(response -> response.contains("\"values\"")).withBasicCredentials(USER, PASSWORD)) .withStartupTimeout(Duration.ofSeconds(360)); } this.dockerContainer.start(); diff --git a/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusIntegration.java b/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusIntegration.java new file mode 100644 index 000000000000..a2dd9254279d --- /dev/null +++ b/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusIntegration.java @@ -0,0 +1,128 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.prometheus; + +import com.google.common.collect.ImmutableMap; +import io.airlift.units.Duration; +import io.trino.spi.connector.ConnectorSplitSource; +import io.trino.spi.connector.DynamicFilter; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.MaterializedResult; +import io.trino.testing.QueryRunner; +import org.testng.annotations.Test; + +import java.util.concurrent.TimeUnit; + +import static io.trino.plugin.prometheus.PrometheusQueryRunner.createPrometheusClient; +import static io.trino.plugin.prometheus.PrometheusQueryRunner.createPrometheusQueryRunner; +import static io.trino.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED; +import static java.util.concurrent.TimeUnit.DAYS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertEquals; + +public class TestPrometheusIntegration + extends AbstractTestQueryFramework +{ + private static final int NUMBER_MORE_THAN_EXPECTED_NUMBER_SPLITS = 100; + + private PrometheusServer server; + private PrometheusClient client; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + this.server = closeAfterClass(new PrometheusServer()); + this.client = createPrometheusClient(server); + return createPrometheusQueryRunner(server, ImmutableMap.of(), ImmutableMap.of()); + } + + @Test + public void testSelectTable() + { + assertThat(query("SELECT labels FROM prometheus.default.up LIMIT 1")) + .matches("SELECT MAP(ARRAY[VARCHAR 'instance', '__name__', 'job'], ARRAY[VARCHAR 'localhost:9090', 'up', 'prometheus'])"); + } + + @Test + public void testPushDown() + { + // default interval on the `up` metric that Prometheus records on itself is about 15 seconds, so this should only yield one or two row + MaterializedResult results = computeActual("SELECT * FROM prometheus.default.up WHERE timestamp > (NOW() - INTERVAL '15' SECOND)"); + assertThat(results).hasSizeBetween(1, 2); + } + + @Test + public void testShowTables() + { + assertQuery("SHOW TABLES IN default LIKE 'up'", "VALUES 'up'"); + } + + @Test + public void testShowCreateSchema() + { + assertQuery("SHOW CREATE SCHEMA default", "VALUES 'CREATE SCHEMA prometheus.default'"); + assertQueryFails("SHOW CREATE SCHEMA unknown", ".*Schema 'prometheus.unknown' does not exist"); + } + + @Test + public void testListSchemaNames() + { + assertQuery("SHOW SCHEMAS LIKE 'default'", "VALUES 'default'"); + } + + @Test + public void testCreateTable() + { + assertQueryFails("CREATE TABLE default.foo (text VARCHAR)", "This connector does not support creating tables"); + } + + @Test + public void testDropTable() + { + assertQueryFails("DROP TABLE default.up", "This connector does not support dropping tables"); + } + + @Test + public void testDescribeTable() + { + assertQuery("DESCRIBE default.up", + "VALUES " + + "('labels', 'map(varchar, varchar)', '', '')," + + "('timestamp', 'timestamp(3) with time zone', '', '')," + + "('value', 'double', '', '')"); + } + + @Test + public void testCorrectNumberOfSplitsCreated() + { + PrometheusConnectorConfig config = new PrometheusConnectorConfig(); + config.setPrometheusURI(server.getUri()); + config.setMaxQueryRangeDuration(new Duration(21, DAYS)); + config.setQueryChunkSizeDuration(new Duration(1, DAYS)); + config.setCacheDuration(new Duration(30, SECONDS)); + PrometheusTable table = client.getTable("default", "up"); + PrometheusSplitManager splitManager = new PrometheusSplitManager(client, new PrometheusClock(), config); + ConnectorSplitSource splits = splitManager.getSplits( + null, + null, + new PrometheusTableHandle("default", table.getName()), + null, + (DynamicFilter) null); + int numSplits = splits.getNextBatch(NOT_PARTITIONED, NUMBER_MORE_THAN_EXPECTED_NUMBER_SPLITS).getNow(null).getSplits().size(); + assertEquals(numSplits, config.getMaxQueryRangeDuration().getValue(TimeUnit.SECONDS) / config.getQueryChunkSizeDuration().getValue(TimeUnit.SECONDS), + 0.001); + } +} diff --git a/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusIntegrationMetrics.java b/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusIntegrationMetrics.java deleted file mode 100644 index bc406e4a0db6..000000000000 --- a/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusIntegrationMetrics.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.prometheus; - -import com.google.common.collect.ImmutableMap; -import io.trino.testing.AbstractTestQueryFramework; -import io.trino.testing.QueryRunner; -import org.testng.annotations.Test; - -import static io.trino.plugin.prometheus.PrometheusQueryRunner.createPrometheusQueryRunner; - -public class TestPrometheusIntegrationMetrics - extends AbstractTestQueryFramework -{ - @Override - protected QueryRunner createQueryRunner() - throws Exception - { - PrometheusServer server = closeAfterClass(new PrometheusServer()); - return createPrometheusQueryRunner(server, ImmutableMap.of(), ImmutableMap.of()); - } - - @Test - public void testShowTables() - { - assertQuery("SHOW TABLES IN default LIKE 'up'", "VALUES 'up'"); - } - - @Test - public void testShowCreateSchema() - { - assertQuery("SHOW CREATE SCHEMA default", "VALUES 'CREATE SCHEMA prometheus.default'"); - assertQueryFails("SHOW CREATE SCHEMA unknown", ".*Schema 'prometheus.unknown' does not exist"); - } - - @Test - public void testListSchemaNames() - { - assertQuery("SHOW SCHEMAS LIKE 'default'", "VALUES 'default'"); - } - - @Test - public void testCreateTable() - { - assertQueryFails("CREATE TABLE default.foo (text VARCHAR)", "This connector does not support creating tables"); - } - - @Test - public void testDropTable() - { - assertQueryFails("DROP TABLE default.up", "This connector does not support dropping tables"); - } - - @Test - public void testDescribeTable() - { - assertQuery("DESCRIBE default.up", - "VALUES " + - "('labels', 'map(varchar, varchar)', '', '')," + - "('timestamp', 'timestamp(3) with time zone', '', '')," + - "('value', 'double', '', '')"); - } -} diff --git a/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusIntegrationSchema.java b/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusIntegrationSchema.java deleted file mode 100644 index b8e0b2ad6769..000000000000 --- a/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusIntegrationSchema.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.prometheus; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import io.airlift.units.Duration; -import io.trino.spi.TrinoException; -import io.trino.spi.connector.ColumnMetadata; -import io.trino.spi.connector.ConnectorSplitSource; -import io.trino.spi.connector.ConnectorTableMetadata; -import io.trino.spi.connector.DynamicFilter; -import io.trino.spi.connector.SchemaTableName; -import io.trino.spi.connector.TableNotFoundException; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import java.util.Optional; -import java.util.concurrent.TimeUnit; - -import static io.trino.plugin.prometheus.MetadataUtil.varcharMapType; -import static io.trino.plugin.prometheus.PrometheusClient.TIMESTAMP_COLUMN_TYPE; -import static io.trino.plugin.prometheus.PrometheusQueryRunner.createPrometheusClient; -import static io.trino.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED; -import static io.trino.spi.type.DoubleType.DOUBLE; -import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; -import static io.trino.testing.TestingConnectorSession.SESSION; -import static java.util.concurrent.TimeUnit.DAYS; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; - -/** - * Integration tests against Prometheus container - */ -@Test(singleThreaded = true) -public class TestPrometheusIntegrationSchema -{ - private static final PrometheusTableHandle RUNTIME_DETERMINED_TABLE_HANDLE = new PrometheusTableHandle("default", "up"); - - private static final int NUMBER_MORE_THAN_EXPECTED_NUMBER_SPLITS = 100; - - private PrometheusServer server; - private PrometheusClient client; - - @BeforeClass - public void createQueryRunner() - { - this.server = new PrometheusServer(); - this.client = createPrometheusClient(server); - } - - @AfterClass(alwaysRun = true) - public final void destroy() - { - server.close(); - } - - @Test - public void testRetrieveUpValue() - { - assertTrue(client.getTableNames("default").contains("up"), "Prometheus' own `up` metric should be available in default"); - } - - @Test - public void testMetadata() - { - assertTrue(client.getTableNames("default").contains("up")); - PrometheusTable table = client.getTable("default", "up"); - assertNotNull(table, "table is null"); - assertEquals(table.getName(), "up"); - assertEquals(table.getColumns(), ImmutableList.of( - new PrometheusColumn("labels", varcharMapType), - new PrometheusColumn("timestamp", TIMESTAMP_COLUMN_TYPE), - new PrometheusColumn("value", DOUBLE))); - } - - @Test - public void testGetTableHandle() - { - PrometheusMetadata metadata = new PrometheusMetadata(client); - assertEquals(metadata.getTableHandle(SESSION, new SchemaTableName("default", "up")), RUNTIME_DETERMINED_TABLE_HANDLE); - assertNull(metadata.getTableHandle(SESSION, new SchemaTableName("default", "unknown"))); - assertNull(metadata.getTableHandle(SESSION, new SchemaTableName("unknown", "numbers"))); - assertNull(metadata.getTableHandle(SESSION, new SchemaTableName("unknown", "unknown"))); - } - - @Test - public void testGetColumnHandles() - { - PrometheusMetadata metadata = new PrometheusMetadata(client); - // known table - assertEquals(metadata.getColumnHandles(SESSION, RUNTIME_DETERMINED_TABLE_HANDLE), ImmutableMap.of( - "labels", new PrometheusColumnHandle("labels", createUnboundedVarcharType(), 0), - "value", new PrometheusColumnHandle("value", DOUBLE, 1), - "timestamp", new PrometheusColumnHandle("timestamp", TIMESTAMP_COLUMN_TYPE, 2))); - - // unknown table - try { - metadata.getColumnHandles(SESSION, new PrometheusTableHandle("unknown", "unknown")); - fail("Expected getColumnHandle of unknown table to throw a TableNotFoundException"); - } - catch (TableNotFoundException expected) { - } - try { - metadata.getColumnHandles(SESSION, new PrometheusTableHandle("default", "unknown")); - fail("Expected getColumnHandle of unknown table to throw a TableNotFoundException"); - } - catch (TableNotFoundException expected) { - } - } - - @Test - public void testGetTableMetadata() - { - PrometheusMetadata metadata = new PrometheusMetadata(client); - // known table - ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(SESSION, RUNTIME_DETERMINED_TABLE_HANDLE); - assertEquals(tableMetadata.getTable(), new SchemaTableName("default", "up")); - assertEquals(tableMetadata.getColumns(), ImmutableList.of( - new ColumnMetadata("labels", varcharMapType), - new ColumnMetadata("timestamp", TIMESTAMP_COLUMN_TYPE), - new ColumnMetadata("value", DOUBLE))); - - // unknown tables should produce null - assertNull(metadata.getTableMetadata(SESSION, new PrometheusTableHandle("unknown", "unknown"))); - assertNull(metadata.getTableMetadata(SESSION, new PrometheusTableHandle("default", "unknown"))); - assertNull(metadata.getTableMetadata(SESSION, new PrometheusTableHandle("unknown", "numbers"))); - } - - @Test - public void testListTables() - { - PrometheusMetadata metadata = new PrometheusMetadata(client); - assertTrue(ImmutableSet.copyOf(metadata.listTables(SESSION, Optional.of("default"))).contains(new SchemaTableName("default", "up"))); - - // unknown schema - assertThatThrownBy(() -> metadata.listTables(SESSION, Optional.of("unknown"))) - .isInstanceOf(TrinoException.class) - .hasMessageContaining("Prometheus did no return metrics list (table names): "); - } - - @Test - public void testCorrectNumberOfSplitsCreated() - { - PrometheusConnectorConfig config = new PrometheusConnectorConfig(); - config.setPrometheusURI(server.getUri()); - config.setMaxQueryRangeDuration(new Duration(21, DAYS)); - config.setQueryChunkSizeDuration(new Duration(1, DAYS)); - config.setCacheDuration(new Duration(30, SECONDS)); - PrometheusTable table = client.getTable("default", "up"); - PrometheusSplitManager splitManager = new PrometheusSplitManager(client, new PrometheusClock(), config); - ConnectorSplitSource splits = splitManager.getSplits( - null, - null, - new PrometheusTableHandle("default", table.getName()), - null, - (DynamicFilter) null); - int numSplits = splits.getNextBatch(NOT_PARTITIONED, NUMBER_MORE_THAN_EXPECTED_NUMBER_SPLITS).getNow(null).getSplits().size(); - assertEquals(numSplits, config.getMaxQueryRangeDuration().getValue(TimeUnit.SECONDS) / config.getQueryChunkSizeDuration().getValue(TimeUnit.SECONDS), - 0.001); - } -} diff --git a/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusIntegrationStatus.java b/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusIntegrationStatus.java deleted file mode 100644 index 96ec110afc2f..000000000000 --- a/plugin/trino-prometheus/src/test/java/io/trino/plugin/prometheus/TestPrometheusIntegrationStatus.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.prometheus; - -import com.google.common.collect.ImmutableMap; -import io.airlift.log.Logger; -import io.trino.testing.AbstractTestQueryFramework; -import io.trino.testing.MaterializedResult; -import io.trino.testing.MaterializedRow; -import io.trino.testing.QueryRunner; -import okhttp3.HttpUrl; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import org.testng.annotations.Test; - -import java.util.concurrent.TimeUnit; - -import static io.trino.plugin.prometheus.PrometheusQueryRunner.createPrometheusQueryRunner; -import static org.assertj.core.api.Assertions.assertThat; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.fail; - -/** - * Integration tests against Prometheus container - */ -@Test(singleThreaded = true) -public class TestPrometheusIntegrationStatus - extends AbstractTestQueryFramework -{ - private PrometheusServer server; - - @Override - protected QueryRunner createQueryRunner() - throws Exception - { - this.server = new PrometheusServer(); - return createPrometheusQueryRunner(server, ImmutableMap.of(), ImmutableMap.of()); - } - - @Test - public void testConfirmMetricAvailableAndCheckUp() - throws Exception - { - int maxTries = 60; - int timeBetweenTriesMillis = 1000; - int tries = 0; - final OkHttpClient httpClient = new OkHttpClient.Builder() - .connectTimeout(120, TimeUnit.SECONDS) - .readTimeout(120, TimeUnit.SECONDS) - .build(); - HttpUrl.Builder urlBuilder = HttpUrl.parse(server.getUri().toString()).newBuilder().encodedPath("/api/v1/query"); - urlBuilder.addQueryParameter("query", "up[1d]"); - String url = urlBuilder.build().toString(); - Request request = new Request.Builder() - .url(url) - .build(); - String responseBody; - // this seems to be a reliable way to ensure Prometheus has `up` metric data - while (tries < maxTries) { - responseBody = httpClient.newCall(request).execute().body().string(); - if (responseBody.contains("values")) { - Logger log = Logger.get(TestPrometheusIntegrationStatus.class); - log.info("prometheus response: %s", responseBody); - break; - } - Thread.sleep(timeBetweenTriesMillis); - tries++; - } - if (tries == maxTries) { - fail("Prometheus container not available for metrics query in " + maxTries * timeBetweenTriesMillis + " milliseconds."); - } - // now we're making sure the client is ready - tries = 0; - while (tries < maxTries) { - if (getQueryRunner().tableExists(getSession(), "up")) { - break; - } - Thread.sleep(timeBetweenTriesMillis); - tries++; - } - if (tries == maxTries) { - fail("Prometheus container, or client, not available for metrics query in " + maxTries * timeBetweenTriesMillis + " milliseconds."); - } - - MaterializedResult results = computeActual("SELECT * FROM prometheus.default.up LIMIT 1"); - assertEquals(results.getRowCount(), 1); - MaterializedRow row = results.getMaterializedRows().get(0); - assertEquals(row.getField(0).toString(), "{instance=localhost:9090, __name__=up, job=prometheus}"); - } - - @Test - public void testPushDown() - { - // default interval on the `up` metric that Prometheus records on itself is about 15 seconds, so this should only yield one or two row - MaterializedResult results = computeActual("SELECT * FROM prometheus.default.up WHERE timestamp > (NOW() - INTERVAL '15' SECOND)"); - assertThat(results).hasSizeBetween(1, 2); - } -}