From c11078b93af09b61e57e4a653d3a33494725cc1d Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Sun, 21 Jun 2020 16:28:00 -0700 Subject: [PATCH] [SPARK-32034][SQL] Port HIVE-14817: Shutdown the SessionManager timeoutChecker thread properly upon shutdown ### What changes were proposed in this pull request? This PR port https://issues.apache.org/jira/browse/HIVE-14817 for spark thrift server. ### Why are the changes needed? When stopping the HiveServer2, the non-daemon thread stops the server from terminating ```sql "HiveServer2-Background-Pool: Thread-79" #79 prio=5 os_prio=31 tid=0x00007fde26138800 nid=0x13713 waiting on condition [0x0000700010c32000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.hive.service.cli.session.SessionManager$1.sleepInterval(SessionManager.java:178) at org.apache.hive.service.cli.session.SessionManager$1.run(SessionManager.java:156) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` Here is an example to reproduce: https://github.com/yaooqinn/kyuubi/blob/master/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/spark/SparkSQLEngineApp.scala Also, it causes issues as HIVE-14817 described which ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? Passing Jenkins Closes #28870 from yaooqinn/SPARK-32034. Authored-by: Kent Yao Signed-off-by: Dongjoon Hyun (cherry picked from commit 9f8e15bb2e2189812ee34e3e64baede0d799ba76) Signed-off-by: Dongjoon Hyun --- .../service/cli/session/SessionManager.java | 32 ++++++++++++++----- .../service/cli/session/SessionManager.java | 32 ++++++++++++++----- 2 files changed, 48 insertions(+), 16 deletions(-) diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/SessionManager.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/SessionManager.java index 859f9c8b449e5..ad6fb3ba37a0e 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/SessionManager.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/SessionManager.java @@ -148,14 +148,20 @@ public synchronized void start() { } } + private final Object timeoutCheckerLock = new Object(); + private void startTimeoutChecker() { final long interval = Math.max(checkInterval, 3000L); // minimum 3 seconds - Runnable timeoutChecker = new Runnable() { + final Runnable timeoutChecker = new Runnable() { @Override public void run() { - for (sleepInterval(interval); !shutdown; sleepInterval(interval)) { + sleepFor(interval); + while (!shutdown) { long current = System.currentTimeMillis(); for (HiveSession session : new ArrayList(handleToSession.values())) { + if (shutdown) { + break; + } if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current && (!checkOperation || session.getNoOperationTime() > sessionTimeout)) { SessionHandle handle = session.getSessionHandle(); @@ -170,24 +176,34 @@ public void run() { session.closeExpiredOperations(); } } + sleepFor(interval); } } - private void sleepInterval(long interval) { - try { - Thread.sleep(interval); - } catch (InterruptedException e) { - // ignore + private void sleepFor(long interval) { + synchronized (timeoutCheckerLock) { + try { + timeoutCheckerLock.wait(interval); + } catch (InterruptedException e) { + // Ignore, and break. + } } } }; backgroundOperationPool.execute(timeoutChecker); } + private void shutdownTimeoutChecker() { + shutdown = true; + synchronized (timeoutCheckerLock) { + timeoutCheckerLock.notify(); + } + } + @Override public synchronized void stop() { super.stop(); - shutdown = true; + shutdownTimeoutChecker(); if (backgroundOperationPool != null) { backgroundOperationPool.shutdown(); long timeout = hiveConf.getTimeVar( diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/SessionManager.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/SessionManager.java index 49221b13bb892..5a381d170b4f9 100644 --- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/SessionManager.java +++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/SessionManager.java @@ -148,14 +148,20 @@ public synchronized void start() { } } + private final Object timeoutCheckerLock = new Object(); + private void startTimeoutChecker() { final long interval = Math.max(checkInterval, 3000L); // minimum 3 seconds - Runnable timeoutChecker = new Runnable() { + final Runnable timeoutChecker = new Runnable() { @Override public void run() { - for (sleepInterval(interval); !shutdown; sleepInterval(interval)) { + sleepFor(interval); + while (!shutdown) { long current = System.currentTimeMillis(); for (HiveSession session : new ArrayList(handleToSession.values())) { + if (shutdown) { + break; + } if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current && (!checkOperation || session.getNoOperationTime() > sessionTimeout)) { SessionHandle handle = session.getSessionHandle(); @@ -170,24 +176,34 @@ public void run() { session.closeExpiredOperations(); } } + sleepFor(interval); } } - private void sleepInterval(long interval) { - try { - Thread.sleep(interval); - } catch (InterruptedException e) { - // ignore + private void sleepFor(long interval) { + synchronized (timeoutCheckerLock) { + try { + timeoutCheckerLock.wait(interval); + } catch (InterruptedException e) { + // Ignore, and break. + } } } }; backgroundOperationPool.execute(timeoutChecker); } + private void shutdownTimeoutChecker() { + shutdown = true; + synchronized (timeoutCheckerLock) { + timeoutCheckerLock.notify(); + } + } + @Override public synchronized void stop() { super.stop(); - shutdown = true; + shutdownTimeoutChecker(); if (backgroundOperationPool != null) { backgroundOperationPool.shutdown(); long timeout = hiveConf.getTimeVar(