Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop opensearch: Graceful close client connection #16091

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ public void apply(Settings value, Settings current, Settings previous) {
HttpTransportSettings.SETTING_HTTP_BIND_HOST,
HttpTransportSettings.SETTING_HTTP_PORT,
HttpTransportSettings.SETTING_HTTP_PUBLISH_PORT,
HttpTransportSettings.SETTING_HTTP_GRACEFUL_SHUTDOWN,
HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS,
HttpTransportSettings.SETTING_HTTP_COMPRESSION,
HttpTransportSettings.SETTING_HTTP_COMPRESSION_LEVEL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.network.CloseableChannel;
import org.opensearch.common.network.NetworkAddress;
Expand Down Expand Up @@ -76,10 +77,12 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_BIND_HOST;
import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_GRACEFUL_SHUTDOWN;
import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH;
import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_PORT;
import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_PUBLISH_HOST;
Expand Down Expand Up @@ -240,6 +243,39 @@ protected void doStop() {
}
}

long gracefulShutdownMillis = SETTING_HTTP_GRACEFUL_SHUTDOWN.get(settings).getMillis();
if (gracefulShutdownMillis > 0 && httpChannels.size() > 0) {

logger.info(
"There are {} open client connections, try to close gracefully within {}ms.",
httpChannels.size(),
gracefulShutdownMillis
);

// Close connection after the client is getting a response.
handlingSettings.setForceCloseConnection();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry @tmanninger , getting to it again just now. So I think this is what we want:

  • try to send the "in-flight" response to the clients within gracefulShutdownMillis timeframe
  • close the connection (forceably or not)

I believe we could make it part of the RestChannel (and leave AbstractHttpServerTransport out of it) by introducing RestChannel::closeGracefuly(long timeout) method and calling it in CloseableChannel.closeChannels (would need some adjustments).


// Wait until all connections are closed or the graceful timeout is reached.
final long startTimeMillis = System.currentTimeMillis();
while (System.currentTimeMillis() - startTimeMillis < gracefulShutdownMillis) {
if (httpChannels.isEmpty()) {
break;
}
try {
TimeUnit.MILLISECONDS.sleep(30);
} catch (InterruptedException ie) {
throw new OpenSearchException("Interrupted waiting for completion of [{}]", ie);
}
}

if (!httpChannels.isEmpty()) {
logger.info("Timeout reached, {} connections not closed gracefully.", httpChannels.size());
} else {
logger.info("Closed all connections gracefully.");
}
}

// Close all channels that are not yet closed
try {
CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void sendResponse(RestResponse restResponse) {
Releasables.closeWhileHandlingException(httpRequest::release);

final ArrayList<Releasable> toClose = new ArrayList<>(3);
if (HttpUtils.shouldCloseConnection(httpRequest)) {
if (settings.forceCloseConnection() || HttpUtils.shouldCloseConnection(httpRequest)) {
toClose.add(() -> CloseableChannel.closeChannel(httpChannel));
}

Expand Down Expand Up @@ -151,6 +151,14 @@ public void sendResponse(RestResponse restResponse) {
setHeaderField(httpResponse, X_OPAQUE_ID, opaque);
}

if (settings.forceCloseConnection()) {
// Server is in shutdown mode. Send client to close the connection.
// -------------
// Only works with http1
// How to check request httpversion is 2 AND send goaway signal?
setHeaderField(httpResponse, CONNECTION, CLOSE);
}

// Add all custom headers
addCustomHeaders(httpResponse, restResponse.getHeaders());
addCustomHeaders(httpResponse, threadContext.getResponseHeaders());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public class HttpHandlingSettings {
private final long readTimeoutMillis;
private boolean corsEnabled;

// Close the connection after response is sent to the client (even if keep-alive is set)
private boolean forceCloseConnection;

public HttpHandlingSettings(
int maxContentLength,
int maxChunkSize,
Expand All @@ -89,6 +92,7 @@ public HttpHandlingSettings(
this.pipeliningMaxEvents = pipeliningMaxEvents;
this.readTimeoutMillis = readTimeoutMillis;
this.corsEnabled = corsEnabled;
this.forceCloseConnection = false;
}

public static HttpHandlingSettings fromSettings(Settings settings) {
Expand Down Expand Up @@ -150,4 +154,12 @@ public long getReadTimeoutMillis() {
public boolean isCorsEnabled() {
return corsEnabled;
}

public void setForceCloseConnection() {
forceCloseConnection = true;
}

public boolean forceCloseConnection() {
return forceCloseConnection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ public final class HttpTransportSettings {
Property.NodeScope
);
public static final Setting<Integer> SETTING_HTTP_PUBLISH_PORT = intSetting("http.publish_port", -1, -1, Property.NodeScope);

public static final Setting<TimeValue> SETTING_HTTP_GRACEFUL_SHUTDOWN = Setting.timeSetting(
"http.graceful_shutdown",
new TimeValue(0),
new TimeValue(0),
Property.NodeScope
);

public static final Setting<Boolean> SETTING_HTTP_DETAILED_ERRORS_ENABLED = Setting.boolSetting(
"http.detailed_errors.enabled",
true,
Expand Down
Loading