Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

An increasing number of WAITING state threads in JDBC driver #14176

Closed
zhuyaogai opened this issue Sep 18, 2022 · 4 comments · Fixed by #13775
Closed

An increasing number of WAITING state threads in JDBC driver #14176

zhuyaogai opened this issue Sep 18, 2022 · 4 comments · Fixed by #13775
Labels
bug Something isn't working jdbc Relates to Trino JDBC driver

Comments

@zhuyaogai
Copy link

version: presto-jdbc:329
jdk version: 1.8.0_261

MAX_QUEUED_ROWS is declared inAsyncIterator, which means that asynchronous threads named Presto JDBC worker-%d will be blocked when the amount of data is greater thanMAX_QUEUED_ROWS.

        private static final int MAX_QUEUED_ROWS = 50_000;
        private static final ExecutorService executorService = newCachedThreadPool(daemonThreadsNamed("Presto JDBC worker-%d"));

        private final StatementClient client;
        private final BlockingQueue<T> rowQueue = new ArrayBlockingQueue<>(MAX_QUEUED_ROWS);
        private final CompletableFuture<Void> future;
        private Thread parent;

        public AsyncIterator(Iterator<T> dataIterator, StatementClient client, Thread parent)
        {
            requireNonNull(dataIterator, "dataIterator is null");
            this.parent = parent;
            this.client = client;
            this.future = CompletableFuture.supplyAsync(() -> {
                while (dataIterator.hasNext()) {
                    try {
                        // when data size is greater than 50_000, will be blocked.
                        rowQueue.put(dataIterator.next());
                    }
                    catch (InterruptedException e) {
                        interrupt(e);
                    }
                }
                return null;
            }, executorService);
        }

        public void cancel()
        {
            future.cancel(true);
        }

        public void interrupt(InterruptedException e)
        {
            client.close();
            Thread.currentThread().interrupt();
            throw new RuntimeException(new SQLException("ResultSet thread was interrupted", e));
        }

When a user submit a SQL query and the amount of data is larger than MAX_QUEUED_ROWS, the queue will be full. If the user call close() without getting the data out from the queue,we will find an increasing number of WAITING state threads over time.

The code to close the query is shown below.

    @Override
    public void close()
            throws SQLException
    {
        closed.set(true);
        ((AsyncIterator) results).cancel();
        client.close();
    }

We notice that the close method eventually calls the future.cancel (true) method, the source code of cancel in CompletableFuture is shown below.

    /**
     * If not already completed, completes this CompletableFuture with
     * a {@link CancellationException}. Dependent CompletableFutures
     * that have not already completed will also complete
     * exceptionally, with a {@link CompletionException} caused by
     * this {@code CancellationException}.
     *
     * @param mayInterruptIfRunning this value has no effect in this
     * implementation because interrupts are not used to control
     * processing.
     *
     * @return {@code true} if this task is now cancelled
     */
    public boolean cancel(boolean mayInterruptIfRunning) {
        boolean cancelled = (result == null) &&
            internalComplete(new AltResult(new CancellationException()));
        postComplete();
        return cancelled || isCancelled();
    }

We notice the parameter mayInterruptIfRunning has no effect in this implementation because interrupts are not used to control processing, so the threads named Presto JDBC worker-%d will be blocked forever. And in the latest version, the code has not changed much about it.

Please correct me if i‘m wrong :)

@findepi
Copy link
Member

findepi commented Sep 18, 2022

version: presto-jdbc:329

thanks for providing the version!

Can you please also make sure the problem is reproducible with the latest JDBC driver (396) too?

@findepi findepi added bug Something isn't working jdbc Relates to Trino JDBC driver labels Sep 18, 2022
@zhuyaogai
Copy link
Author

I ran some simple tests and make sure the problem is reproducible with the latest JDBC driver (396) too.

My demo code is shown below.

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

public class TrinoTest {
    static final int N = 10;
    static final String DB_URL = "jdbc:trino://localhost:8080/hive/default";
    static final String USER = "hive";

    public static void main(String[] args) throws InterruptedException {
        // simple query sql, such as `select * from table`, the amount of data is larger than MAX_QUEUED_ROWS
        String sql = "xxx";
        for (int i = 0; i < N; ++i) {
            try (Connection conn = DriverManager.getConnection(DB_URL, USER, null); Statement stmt = conn.createStatement()) {
                boolean hasResult = stmt.execute(sql);
                System.out.println(hasResult);
                // simulate some operations, but actually don't fetch results finally.
                Thread.sleep(3000);
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        Thread.sleep(100000000);
    }
}

We see many threads in the WAITING state.
image

We analyzed the monitor metrics, and found that the thread was permanently blocked, might lead to memory leak because the rowQueue object cannot be reclaimed. And I've got some simple ideas on this problem.

Please correct me if i'm wrong :)

@findepi
Copy link
Member

findepi commented Sep 19, 2022

I was able to reproduce this locally with

@Test(timeOut = 60_000)
public void test()
        throws Exception
{
    int rowsToRead = TrinoResultSet.MAX_QUEUED_ROWS /* TrinoResultSet.AsyncIterator.MAX_QUEUED_ROWS */ + 5;
    /*try (Connection connection = createConnection())*/ {
        for (int i = 0; i < 30; i++) {
            try (Connection connection = createConnection(); Statement statement = connection.createStatement()) {
                boolean hasResultSet = statement.execute(format(
                        "SELECT * FROM UNNEST(sequence(0, %s)) CROSS JOIN UNNEST(sequence(0, %s))",
                        1000,
                        IntMath.divide(rowsToRead, 1000, UP)));
                assertThat(hasResultSet).as("hasResultSet").isTrue();
                Thread.sleep(1000); // let the AsyncIterator queue to fill up
            }
        }
    }

    System.out.println();
    Thread.getAllStackTraces().keySet().stream()
            .map(Thread::getName)
            .filter(name -> name.toLowerCase(Locale.ROOT).contains("jdbc"))
            .forEach(System.out::println);
}

and it seems #13775 fixes the problem
@zhuyaogai would you be able to confirm that?

@zhuyaogai
Copy link
Author

zhuyaogai commented Sep 19, 2022

Indeed, it seems #13775 fixes the problem.

The solution is similar to what I thought

  • Use FutureTask instead of CompletableFuture
  • Make sure to call rowQueue.clear() after client.close()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working jdbc Relates to Trino JDBC driver
Development

Successfully merging a pull request may close this issue.

2 participants