Skip to content

Commit

Permalink
CORE: (#1472)
Browse files Browse the repository at this point in the history
- Add RpcResponder for handling callbacks asynchronously
UTILS:
 - Add two convenient methods in Config

Signed-off-by: Chris Larsen <clarsen@verizonmedia.com>
  • Loading branch information
ZephyrGuo authored and manolama committed Jan 27, 2019
1 parent 4a44f10 commit 66a3397
Show file tree
Hide file tree
Showing 6 changed files with 311 additions and 59 deletions.
2 changes: 2 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ tsdb_SRC := \
src/core/RequestBuilder.java \
src/core/RowKey.java \
src/core/RowSeq.java \
src/core/RpcResponder.java \
src/core/iRowSeq.java \
src/core/SaltScanner.java \
src/core/SeekableView.java \
Expand Down Expand Up @@ -317,6 +318,7 @@ test_SRC := \
test/core/TestRateSpan.java \
test/core/TestRowKey.java \
test/core/TestRowSeq.java \
test/core/TestRpcResponsder.java \
test/core/TestSaltScanner.java \
test/core/TestSpan.java \
test/core/TestSpanGroup.java \
Expand Down
110 changes: 110 additions & 0 deletions src/core/RpcResponder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// This file is part of OpenTSDB.
// Copyright (C) 2010-2017 The OpenTSDB Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 2.1 of the License, or (at your
// option) any later version. This program is distributed in the hope that it
// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details. You should have received a copy
// of the GNU Lesser General Public License along with this program. If not,
// see <http://www.gnu.org/licenses/>.
package net.opentsdb.core;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import net.opentsdb.utils.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
* This class is responsible for building result of requests and
* respond to clients asynchronously.
*
* It can reduce requests that stacking in AsyncHBase, especially put requests.
* When a HBase's RPC has completed, the "AsyncHBase I/O worker" just decodes
* the response, and then do callback by this class asynchronously. We should
* take up workers as short as possible time so that workers can remove RPCs
* from in-flight state more quickly.
*
*/
public class RpcResponder {

private static final Logger LOG = LoggerFactory.getLogger(RpcResponder.class);

public static final String TSD_RESPONSE_ASYNC_KEY = "tsd.core.response.async";
public static final boolean TSD_RESPONSE_ASYNC_DEFAULT = true;

public static final String TSD_RESPONSE_WORKER_NUM_KEY =
"tsd.core.response.worker.num";
public static final int TSD_RESPONSE_WORKER_NUM_DEFAULT = 10;

private final boolean async;
private ExecutorService responders;
private volatile boolean running = true;

RpcResponder(final Config config) {
async = config.getBoolean(TSD_RESPONSE_ASYNC_KEY,
TSD_RESPONSE_ASYNC_DEFAULT);

if (async) {
int threads = config.getInt(TSD_RESPONSE_WORKER_NUM_KEY,
TSD_RESPONSE_WORKER_NUM_DEFAULT);
responders = Executors.newFixedThreadPool(threads,
new ThreadFactoryBuilder()
.setNameFormat("OpenTSDB Responder #%d")
.setDaemon(true)
.setUncaughtExceptionHandler(new ExceptionHandler())
.build());
}

LOG.info("RpcResponder mode: {}", async ? "async" : "sync");
}

public void response(Runnable run) {
if (async) {
if (running) {
responders.execute(run);
} else {
throw new IllegalStateException("RpcResponder is closing or closed.");
}
} else {
run.run();
}
}

public void close() {
if (running) {
running = false;
responders.shutdown();
}

boolean completed;
try {
completed = responders.awaitTermination(5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
completed = false;
}

if (!completed) {
LOG.warn(
"There are still some results that are not returned to the clients.");
}
}

public boolean isAsync() {
return async;
}

private class ExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.error("Run into an uncaught exception in thread: " + t.getName(), e);
}
}
}
51 changes: 43 additions & 8 deletions src/core/TSDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ public enum OperationMode {
/** Timer used for various tasks such as idle timeouts or query timeouts */
private final HashedWheelTimer timer;

/** RpcResponder for doing response asynchronously*/
private final RpcResponder rpcResponder;

/**
* Row keys that need to be compacted.
* Whenever we write a new data point to a row, we add the row key to this
Expand Down Expand Up @@ -343,7 +346,10 @@ public TSDB(final HBaseClient client, final Config config) {

// set any extra tags from the config for stats
StatsCollector.setGlobalTags(config);



rpcResponder = new RpcResponder(config);

LOG.debug(config.dumpConfiguration());
}

Expand Down Expand Up @@ -1657,20 +1663,43 @@ public String toString() {
}
}

final class RpcResponsderShutdown implements Callback<Object, Object> {
@Override
public Object call(Object arg) throws Exception {
try {
TSDB.this.rpcResponder.close();
} catch (Exception e) {
LOG.error(
"Run into unknown exception while closing RpcResponder.", e);
} finally {
return arg;
}
}
}

final class HClientShutdown implements Callback<Deferred<Object>, ArrayList<Object>> {
public Deferred<Object> call(final ArrayList<Object> args) {
public Deferred<Object> call(final ArrayList<Object> args) {
Callback<Object, Object> nextCallback;
if (storage_exception_handler != null) {
return client.shutdown().addBoth(new SEHShutdown());
nextCallback = new SEHShutdown();
} else {
nextCallback = new FinalShutdown();
}
return client.shutdown().addBoth(new FinalShutdown());

if (TSDB.this.rpcResponder.isAsync()) {
client.shutdown().addBoth(new RpcResponsderShutdown());
}

return client.shutdown().addBoth(nextCallback);
}
public String toString() {

public String toString() {
return "shutdown HBase client";
}
}

final class ShutdownErrback implements Callback<Object, Exception> {
public Object call(final Exception e) {
public Object call(final Exception e) {
final Logger LOG = LoggerFactory.getLogger(ShutdownErrback.class);
if (e instanceof DeferredGroupException) {
final DeferredGroupException ge = (DeferredGroupException) e;
Expand All @@ -1684,13 +1713,14 @@ public Object call(final Exception e) {
}
return new HClientShutdown().call(null);
}
public String toString() {

public String toString() {
return "shutdown HBase client after error";
}
}

final class CompactCB implements Callback<Object, ArrayList<Object>> {
public Object call(ArrayList<Object> compactions) throws Exception {
public Object call(ArrayList<Object> compactions) throws Exception {
return null;
}
}
Expand Down Expand Up @@ -2189,4 +2219,9 @@ final Deferred<Object> delete(final byte[] key, final byte[][] qualifiers) {
return client.delete(new DeleteRequest(table, key, FAMILY, qualifiers));
}

/** Do response by RpcResponder */
public void response(Runnable run) {
rpcResponder.response(run);
}

}
107 changes: 56 additions & 51 deletions src/tsd/PutDataPointRpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -616,60 +616,65 @@ class GroupCB implements Callback<Object, ArrayList<Boolean>> {
public GroupCB(final int queued) {
this.queued = queued;
}

@Override
public Object call(final ArrayList<Boolean> results) {
if (sending_response.get()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Put data point call " + query + " was marked as timedout");
}
return null;
} else {
sending_response.set(true);
if (timeout != null) {
timeout.cancel();
}
}
int good_writes = 0;
int failed_writes = 0;
for (final boolean result : results) {
if (result) {
++good_writes;
} else {
++failed_writes;
}
}

final int failures = dps.size() - queued;
if (!show_summary && !show_details) {
if (failures + failed_writes > 0) {
query.sendReply(HttpResponseStatus.BAD_REQUEST,
query.serializer().formatErrorV1(
new BadRequestException(HttpResponseStatus.BAD_REQUEST,
"One or more data points had errors",
"Please see the TSD logs or append \"details\" to the put request")));
} else {
query.sendReply(HttpResponseStatus.NO_CONTENT, "".getBytes());
}
} else {
final HashMap<String, Object> summary = new HashMap<String, Object>();
if (sync_timeout > 0) {
summary.put("timeouts", 0);
}
summary.put("success", results.isEmpty() ? queued : good_writes);
summary.put("failed", failures + failed_writes);
if (show_details) {
summary.put("errors", details);
}

if (failures > 0) {
query.sendReply(HttpResponseStatus.BAD_REQUEST,
query.serializer().formatPutV1(summary));
} else {
query.sendReply(query.serializer().formatPutV1(summary));
tsdb.response(new Runnable() {
@Override
public void run() {
if (sending_response.get()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Put data point call " + query + " was marked as timedout");
}
return;
} else {
sending_response.set(true);
if (timeout != null) {
timeout.cancel();
}
}
int good_writes = 0;
int failed_writes = 0;
for (final boolean result : results) {
if (result) {
++good_writes;
} else {
++failed_writes;
}
}

final int failures = dps.size() - queued;
if (!show_summary && !show_details) {
if (failures + failed_writes > 0) {
query.sendReply(HttpResponseStatus.BAD_REQUEST,
query.serializer().formatErrorV1(
new BadRequestException(HttpResponseStatus.BAD_REQUEST,
"One or more data points had errors",
"Please see the TSD logs or append \"details\" to the put request")));
} else {
query.sendReply(HttpResponseStatus.NO_CONTENT, "".getBytes());
}
} else {
final HashMap<String, Object> summary = new HashMap<String, Object>();
if (sync_timeout > 0) {
summary.put("timeouts", 0);
}
summary.put("success", results.isEmpty() ? queued : good_writes);
summary.put("failed", failures + failed_writes);
if (show_details) {
summary.put("errors", details);
}

if (failures > 0) {
query.sendReply(HttpResponseStatus.BAD_REQUEST,
query.serializer().formatPutV1(summary));
} else {
query.sendReply(query.serializer().formatPutV1(summary));
}
}
}
}
});

return null;
}
@Override
Expand Down
35 changes: 35 additions & 0 deletions src/utils/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import java.util.Properties;

import net.opentsdb.core.RpcResponder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -340,6 +341,23 @@ public final int getInt(final String property) {
return Integer.parseInt(sanitize(properties.get(property)));
}

/**
* Returns the given property as an integer.
* If no such property is specified, or if the specified value is not a valid
* <code>Int</code>, then <code>default_val</code> is returned.
*
* @param property The property to load
* @param default_val default value
* @return A parsed integer or default_val.
*/
public final int getInt(final String property, final int default_val) {
try {
return getInt(property);
} catch (Exception e) {
return default_val;
}
}

/**
* Returns the given string trimed or null if is null
* @param string The string be trimmed of
Expand Down Expand Up @@ -420,6 +438,23 @@ public final boolean getBoolean(final String property) {
return false;
}

/**
* Returns the given property as an boolean.
* If no such property is specified, or if the specified value is not a valid
* <code>boolean</code>, then <code>default_val</code> is returned.
*
* @param property The property to load
* @param default_val default value
* @return A parsed boolean or default_val.
*/
public final boolean getBoolean(final String property, final boolean default_val) {
try {
return getBoolean(property);
} catch (Exception e) {
return default_val;
}
}

/**
* Returns the directory name, making sure the end is an OS dependent slash
* @param property The property to load
Expand Down
Loading

0 comments on commit 66a3397

Please sign in to comment.