Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move JDBC warnings to Statement #1640

Merged
merged 6 commits into from
Oct 3, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ public class PrestoConnection
private final Map<String, ClientSelectedRole> roles = new ConcurrentHashMap<>();
private final AtomicReference<String> transactionId = new AtomicReference<>();
private final QueryExecutor queryExecutor;
private final WarningsManager warningsManager = new WarningsManager();

PrestoConnection(PrestoDriverUri uri, QueryExecutor queryExecutor)
throws SQLException
Expand Down Expand Up @@ -722,11 +721,6 @@ void updateSession(StatementClient client)
}
}

WarningsManager getWarningsManager()
{
return warningsManager;
}

private void checkOpen()
throws SQLException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import static java.util.Objects.requireNonNull;

@ThreadSafe
public class WarningsManager
final class WarningsManager
{
@GuardedBy("this")
private final Set<Warning.Code> warningsSeen = new HashSet<>();
Expand Down
87 changes: 30 additions & 57 deletions presto-jdbc/src/test/java/io/prestosql/jdbc/TestJdbcWarnings.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.AbstractFuture;
import io.prestosql.client.Warning;
import io.prestosql.execution.QueryInfo;
import io.prestosql.execution.warnings.WarningCollectorConfig;
import io.prestosql.plugin.blackhole.BlackHolePlugin;
import io.prestosql.plugin.tpch.TpchPlugin;
import io.prestosql.server.testing.TestingPrestoServer;
import io.prestosql.spi.PrestoWarning;
import io.prestosql.spi.WarningCode;
Expand All @@ -45,13 +42,15 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.prestosql.jdbc.TestPrestoDriver.closeQuietly;
import static io.prestosql.jdbc.TestPrestoDriver.waitForNodeRefresh;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
Expand All @@ -67,6 +66,7 @@ public class TestJdbcWarnings
private TestingPrestoServer server;
private Connection connection;
private Statement statement;
private ExecutorService executor;

@BeforeClass
public void setupServer()
Expand All @@ -76,11 +76,22 @@ public void setupServer()
.put("testing-warning-collector.add-warnings", "true")
.put("testing-warning-collector.preloaded-warnings", String.valueOf(PRELOADED_WARNINGS))
.build());
server.installPlugin(new TpchPlugin());
server.createCatalog("tpch", "tpch");
server.installPlugin(new BlackHolePlugin());
server.createCatalog("blackhole", "blackhole");
waitForNodeRefresh(server);

try (Connection connection = createConnection();
Statement statement = connection.createStatement()) {
statement.execute("CREATE SCHEMA blackhole.blackhole");
statement.executeUpdate("" +
"CREATE TABLE slow_table (x int) " +
"WITH (" +
" split_count = 1, " +
" pages_per_split = 5, " +
" rows_per_page = 3, " +
" page_processing_delay = '1s'" +
")");
}
}

@AfterClass(alwaysRun = true)
Expand All @@ -96,13 +107,15 @@ public void setup()
{
connection = createConnection();
statement = connection.createStatement();
executor = newCachedThreadPool(daemonThreadsNamed("test-%s"));
}

@AfterMethod(alwaysRun = true)
public void teardown()
{
closeQuietly(statement);
closeQuietly(connection);
executor.shutdownNow();
}

@Test
Expand All @@ -124,57 +137,42 @@ public void testStatementWarnings()
public void testLongRunningStatement()
throws SQLException, InterruptedException
{
ExecutorService queryExecutor = newSingleThreadExecutor(daemonThreadsNamed("test-%s"));
QueryCreationFuture queryCreationFuture = new QueryCreationFuture();
queryExecutor.submit(() -> {
try {
statement.execute("CREATE SCHEMA blackhole.blackhole");
statement.execute("CREATE TABLE blackhole.blackhole.test_table AS SELECT 1 AS col1 FROM tpch.sf1.lineitem CROSS JOIN tpch.sf1.lineitem");
electrum marked this conversation as resolved.
Show resolved Hide resolved
queryCreationFuture.set(null);
}
catch (Throwable e) {
queryCreationFuture.setException(e);
}
Future<?> future = executor.submit(() -> {
statement.execute("CREATE TABLE test_long_running AS SELECT * FROM slow_table");
return null;
});
while (statement.getWarnings() == null) {
Thread.sleep(100);
}
int expectedWarnings = 10;
SQLWarning warning = statement.getWarnings();
Set<WarningEntry> currentWarnings = new HashSet<>();
assertTrue(currentWarnings.add(new WarningEntry(warning)));
for (int warnings = 1; !queryCreationFuture.isDone() && warnings < 100; warnings++) {
for (int warnings = 1; !future.isDone() && warnings < expectedWarnings; warnings++) {
for (SQLWarning nextWarning = warning.getNextWarning(); nextWarning == null; nextWarning = warning.getNextWarning()) {
// Wait for new warnings
}
warning = warning.getNextWarning();
assertTrue(currentWarnings.add(new WarningEntry(warning)));
Thread.sleep(100);
}
assertEquals(currentWarnings.size(), 100);
queryExecutor.shutdownNow();
assertThat(currentWarnings).size().isGreaterThanOrEqualTo(expectedWarnings);
}

@Test
public void testLongRunningQuery()
throws SQLException, InterruptedException
{
ExecutorService queryExecutor = newSingleThreadExecutor(daemonThreadsNamed("test-%s"));
QueryCreationFuture queryCreationFuture = new QueryCreationFuture();
queryExecutor.submit(() -> {
try {
statement.execute("SELECT 1 AS col1 FROM tpch.sf1.lineitem CROSS JOIN tpch.sf1.lineitem");
queryCreationFuture.set(null);
}
catch (Throwable e) {
queryCreationFuture.setException(e);
}
Future<?> future = executor.submit(() -> {
statement.execute("SELECT * FROM slow_table");
return null;
});
while (statement.getResultSet() == null) {
Thread.sleep(100);
}
ResultSet resultSet = statement.getResultSet();
Set<WarningEntry> currentWarnings = new HashSet<>();
for (int rows = 0; !queryCreationFuture.isDone() && rows < 10; ) {
for (int rows = 0; !future.isDone() && rows < 10; ) {
if (resultSet.next()) {
for (SQLWarning warning = resultSet.getWarnings(); warning.getNextWarning() != null; warning = warning.getNextWarning()) {
assertTrue(currentWarnings.add(new WarningEntry(warning.getNextWarning())));
Expand All @@ -185,7 +183,6 @@ public void testLongRunningQuery()
}
Thread.sleep(100);
}
queryExecutor.shutdownNow();
}

@Test
Expand Down Expand Up @@ -268,11 +265,10 @@ private static void addWarnings(Set<WarningEntry> currentWarnings, SQLWarning ne
}
}

//TODO: this method seems to be copied in multiple test classes in this package, should it be moved to a utility?
private Connection createConnection()
throws SQLException
{
String url = format("jdbc:presto://%s", server.getAddress());
String url = format("jdbc:presto://%s/blackhole/blackhole", server.getAddress());
return DriverManager.getConnection(url, "test", null);
}

Expand Down Expand Up @@ -334,27 +330,4 @@ public int hashCode()
return Objects.hash(vendorCode, sqlState, message);
}
}

private static class QueryCreationFuture
extends AbstractFuture<QueryInfo>
electrum marked this conversation as resolved.
Show resolved Hide resolved
{
@Override
protected boolean set(QueryInfo value)
{
return super.set(value);
}

@Override
protected boolean setException(Throwable throwable)
{
return super.setException(throwable);
}

@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
// query submission can not be canceled
return false;
}
}
}