Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Identify REMOTE_HOST_GONE for transport failure #7691

Merged
merged 3 commits into from
Apr 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.client.ErrorLocation;
import com.facebook.presto.client.FailureInfo;
import com.facebook.presto.spi.ErrorCode;
import com.facebook.presto.spi.HostAddress;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -43,6 +44,8 @@ public class ExecutionFailureInfo
private final List<String> stack;
private final ErrorLocation errorLocation;
private final ErrorCode errorCode;
// use for transport errors
private final HostAddress remoteHost;

@JsonCreator
public ExecutionFailureInfo(
Expand All @@ -52,7 +55,8 @@ public ExecutionFailureInfo(
@JsonProperty("suppressed") List<ExecutionFailureInfo> suppressed,
@JsonProperty("stack") List<String> stack,
@JsonProperty("errorLocation") @Nullable ErrorLocation errorLocation,
@JsonProperty("errorCode") @Nullable ErrorCode errorCode)
@JsonProperty("errorCode") @Nullable ErrorCode errorCode,
@JsonProperty("remoteHost") @Nullable HostAddress remoteHost)
{
requireNonNull(type, "type is null");
requireNonNull(suppressed, "suppressed is null");
Expand All @@ -65,6 +69,7 @@ public ExecutionFailureInfo(
this.stack = ImmutableList.copyOf(stack);
this.errorLocation = errorLocation;
this.errorCode = errorCode;
this.remoteHost = remoteHost;
}

@NotNull
Expand Down Expand Up @@ -116,6 +121,13 @@ public ErrorCode getErrorCode()
return errorCode;
}

@Nullable
@JsonProperty
public HostAddress getRemoteHost()
{
return remoteHost;
}

public FailureInfo toFailureInfo()
{
List<FailureInfo> suppressed = this.suppressed.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.facebook.presto.execution.scheduler.NodeScheduler;
import com.facebook.presto.execution.scheduler.SplitSchedulerStats;
import com.facebook.presto.execution.scheduler.SqlQueryScheduler;
import com.facebook.presto.failureDetector.FailureDetector;
import com.facebook.presto.memory.VersionedMemoryPoolId;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.metadata.TableHandle;
Expand Down Expand Up @@ -100,6 +101,7 @@ public final class SqlQueryExecution
private final LocationFactory locationFactory;
private final int scheduleSplitBatchSize;
private final ExecutorService queryExecutor;
private final FailureDetector failureDetector;

private final QueryExplainer queryExplainer;
private final AtomicReference<SqlQueryScheduler> queryScheduler = new AtomicReference<>();
Expand All @@ -125,6 +127,7 @@ public SqlQueryExecution(QueryId queryId,
LocationFactory locationFactory,
int scheduleSplitBatchSize,
ExecutorService queryExecutor,
FailureDetector failureDetector,
NodeTaskMap nodeTaskMap,
QueryExplainer queryExplainer,
ExecutionPolicy executionPolicy,
Expand All @@ -142,6 +145,7 @@ public SqlQueryExecution(QueryId queryId,
this.planOptimizers = requireNonNull(planOptimizers, "planOptimizers is null");
this.locationFactory = requireNonNull(locationFactory, "locationFactory is null");
this.queryExecutor = requireNonNull(queryExecutor, "queryExecutor is null");
this.failureDetector = requireNonNull(failureDetector, "failureDetector is null");
this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null");
this.executionPolicy = requireNonNull(executionPolicy, "executionPolicy is null");
this.queryExplainer = requireNonNull(queryExplainer, "queryExplainer is null");
Expand Down Expand Up @@ -378,6 +382,7 @@ private void planDistribution(PlanRoot plan)
plan.isSummarizeTaskInfos(),
scheduleSplitBatchSize,
queryExecutor,
failureDetector,
rootOutputBuffers,
nodeTaskMap,
executionPolicy,
Expand Down Expand Up @@ -561,6 +566,7 @@ public static class SqlQueryExecutionFactory
private final QueryExplainer queryExplainer;
private final LocationFactory locationFactory;
private final ExecutorService executor;
private final FailureDetector failureDetector;
private final NodeTaskMap nodeTaskMap;
private final Map<String, ExecutionPolicy> executionPolicies;

Expand All @@ -578,6 +584,7 @@ public static class SqlQueryExecutionFactory
RemoteTaskFactory remoteTaskFactory,
TransactionManager transactionManager,
@ForQueryExecution ExecutorService executor,
FailureDetector failureDetector,
NodeTaskMap nodeTaskMap,
QueryExplainer queryExplainer,
Map<String, ExecutionPolicy> executionPolicies,
Expand All @@ -598,6 +605,7 @@ public static class SqlQueryExecutionFactory
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
requireNonNull(featuresConfig, "featuresConfig is null");
this.executor = requireNonNull(executor, "executor is null");
this.failureDetector = requireNonNull(failureDetector, "failureDetector is null");
this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null");
this.queryExplainer = requireNonNull(queryExplainer, "queryExplainer is null");

Expand Down Expand Up @@ -631,6 +639,7 @@ public SqlQueryExecution createQueryExecution(QueryId queryId, String query, Ses
locationFactory,
scheduleSplitBatchSize,
executor,
failureDetector,
nodeTaskMap,
queryExplainer,
executionPolicy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.Session;
import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.execution.scheduler.SplitSchedulerStats;
import com.facebook.presto.failureDetector.FailureDetector;
import com.facebook.presto.metadata.RemoteTransactionHandle;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.spi.Node;
Expand Down Expand Up @@ -50,7 +51,9 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static com.facebook.presto.failureDetector.FailureDetector.State.GONE;
import static com.facebook.presto.operator.ExchangeOperator.REMOTE_CONNECTOR_ID;
import static com.facebook.presto.spi.StandardErrorCode.REMOTE_HOST_GONE;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand All @@ -65,6 +68,7 @@ public final class SqlStageExecution
private final RemoteTaskFactory remoteTaskFactory;
private final NodeTaskMap nodeTaskMap;
private final boolean summarizeTaskInfo;
private final FailureDetector failureDetector;

private final Map<PlanFragmentId, RemoteSourceNode> exchangeSources;

Expand All @@ -89,6 +93,7 @@ public SqlStageExecution(
boolean summarizeTaskInfo,
NodeTaskMap nodeTaskMap,
ExecutorService executor,
FailureDetector failureDetector,
SplitSchedulerStats schedulerStats)
{
this(new StageStateMachine(
Expand All @@ -100,15 +105,17 @@ public SqlStageExecution(
requireNonNull(schedulerStats, "schedulerStats is null")),
remoteTaskFactory,
nodeTaskMap,
summarizeTaskInfo);
summarizeTaskInfo,
failureDetector);
}

public SqlStageExecution(StageStateMachine stateMachine, RemoteTaskFactory remoteTaskFactory, NodeTaskMap nodeTaskMap, boolean summarizeTaskInfo)
public SqlStageExecution(StageStateMachine stateMachine, RemoteTaskFactory remoteTaskFactory, NodeTaskMap nodeTaskMap, boolean summarizeTaskInfo, FailureDetector failureDetector)
{
this.stateMachine = stateMachine;
this.remoteTaskFactory = requireNonNull(remoteTaskFactory, "remoteTaskFactory is null");
this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null");
this.summarizeTaskInfo = summarizeTaskInfo;
this.failureDetector = requireNonNull(failureDetector, "failureDetector is null");

ImmutableMap.Builder<PlanFragmentId, RemoteSourceNode> fragmentToExchangeSource = ImmutableMap.builder();
for (RemoteSourceNode remoteSourceNode : stateMachine.getFragment().getRemoteSourceNodes()) {
Expand Down Expand Up @@ -389,6 +396,7 @@ public void stateChanged(TaskStatus taskStatus)
if (taskState == TaskState.FAILED) {
RuntimeException failure = taskStatus.getFailures().stream()
.findFirst()
.map(this::rewriteTransportFailure)
.map(ExecutionFailureInfo::toException)
.orElse(new PrestoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, "A task failed for an unknown reason"));
stateMachine.transitionToFailed(failure);
Expand Down Expand Up @@ -418,5 +426,25 @@ private synchronized void updateMemoryUsage(TaskStatus taskStatus)
previousMemory = currentMemory;
stateMachine.updateMemoryUsage(deltaMemoryInBytes);
}

private ExecutionFailureInfo rewriteTransportFailure(ExecutionFailureInfo executionFailureInfo)
{
if (executionFailureInfo.getRemoteHost() != null &&
failureDetector.getState(executionFailureInfo.getRemoteHost()) == GONE) {
return new ExecutionFailureInfo(
executionFailureInfo.getType(),
executionFailureInfo.getMessage(),
executionFailureInfo.getCause(),
executionFailureInfo.getSuppressed(),
executionFailureInfo.getStack(),
executionFailureInfo.getErrorLocation(),
REMOTE_HOST_GONE.toErrorCode(),
executionFailureInfo.getRemoteHost()
);
}
else {
return executionFailureInfo;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.facebook.presto.execution.StageId;
import com.facebook.presto.execution.StageInfo;
import com.facebook.presto.execution.StageState;
import com.facebook.presto.failureDetector.FailureDetector;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.split.SplitSource;
Expand Down Expand Up @@ -91,6 +92,7 @@ public class SqlQueryScheduler
private final ExecutionPolicy executionPolicy;
private final Map<StageId, SqlStageExecution> stages;
private final ExecutorService executor;
private final FailureDetector failureDetector;
private final StageId rootStageId;
private final Map<StageId, StageScheduler> stageSchedulers;
private final Map<StageId, StageLinkage> stageLinkages;
Expand All @@ -108,6 +110,7 @@ public SqlQueryScheduler(QueryStateMachine queryStateMachine,
boolean summarizeTaskInfo,
int splitBatchSize,
ExecutorService executor,
FailureDetector failureDetector,
OutputBuffers rootOutputBuffers,
NodeTaskMap nodeTaskMap,
ExecutionPolicy executionPolicy,
Expand Down Expand Up @@ -136,6 +139,7 @@ public SqlQueryScheduler(QueryStateMachine queryStateMachine,
splitBatchSize,
partitioningHandle -> partitioningCache.computeIfAbsent(partitioningHandle, handle -> nodePartitioningManager.getNodePartitioningMap(session, handle)),
executor,
failureDetector,
nodeTaskMap,
stageSchedulers,
stageLinkages);
Expand All @@ -151,6 +155,7 @@ public SqlQueryScheduler(QueryStateMachine queryStateMachine,
this.stageLinkages = stageLinkages.build();

this.executor = executor;
this.failureDetector = failureDetector;

rootStage.addStateChangeListener(state -> {
if (state == FINISHED) {
Expand Down Expand Up @@ -195,6 +200,7 @@ private List<SqlStageExecution> createStages(
int splitBatchSize,
Function<PartitioningHandle, NodePartitionMap> partitioningCache,
ExecutorService executor,
FailureDetector failureDetector,
NodeTaskMap nodeTaskMap,
ImmutableMap.Builder<StageId, StageScheduler> stageSchedulers,
ImmutableMap.Builder<StageId, StageLinkage> stageLinkages)
Expand All @@ -211,6 +217,7 @@ private List<SqlStageExecution> createStages(
summarizeTaskInfo,
nodeTaskMap,
executor,
failureDetector,
schedulerStats);

stages.add(stage);
Expand Down Expand Up @@ -266,6 +273,7 @@ private List<SqlStageExecution> createStages(
splitBatchSize,
partitioningCache,
executor,
failureDetector,
nodeTaskMap,
stageSchedulers,
stageLinkages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,22 @@
*/
package com.facebook.presto.failureDetector;

import com.facebook.presto.spi.HostAddress;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inquisition is the wrong word.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FailureDetector now attempts to determine the state of a node based on the last exception encountered.

import io.airlift.discovery.client.ServiceDescriptor;

import java.util.Set;

public interface FailureDetector
{
Set<ServiceDescriptor> getFailed();

State getState(HostAddress hostAddress);

enum State
{
UNKNOWN,
ALIVE,
GONE,
UNRESPONSIVE,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.failureDetector;

import com.facebook.presto.spi.HostAddress;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
Expand All @@ -37,8 +38,11 @@
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;

import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -52,6 +56,11 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static com.facebook.presto.failureDetector.FailureDetector.State.ALIVE;
import static com.facebook.presto.failureDetector.FailureDetector.State.GONE;
import static com.facebook.presto.failureDetector.FailureDetector.State.UNKNOWN;
import static com.facebook.presto.failureDetector.FailureDetector.State.UNRESPONSIVE;
import static com.facebook.presto.spi.HostAddress.fromUri;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
Expand Down Expand Up @@ -143,6 +152,31 @@ public Set<ServiceDescriptor> getFailed()
.collect(toImmutableSet());
}

@Override
public State getState(HostAddress hostAddress)
{
for (MonitoringTask task : tasks.values()) {
if (hostAddress.equals(fromUri(task.uri))) {
if (!task.isFailed()) {
return ALIVE;
}

Exception lastFailureException = task.getStats().getLastFailureException();
if (lastFailureException instanceof SocketTimeoutException || lastFailureException instanceof UnknownHostException) {
return GONE;
}

if (lastFailureException instanceof ConnectException) {
return UNRESPONSIVE;
}

return UNKNOWN;
}
}

return UNKNOWN;
}

@Managed(description = "Number of failed services")
public int getFailedCount()
{
Expand Down Expand Up @@ -361,6 +395,7 @@ public static class Stats
private final DecayCounter recentSuccesses = new DecayCounter(ExponentialDecay.oneMinute());
private final AtomicReference<DateTime> lastRequestTime = new AtomicReference<>();
private final AtomicReference<DateTime> lastResponseTime = new AtomicReference<>();
private final AtomicReference<Exception> lastFailureException = new AtomicReference<>();

@GuardedBy("this")
private final Map<Class<? extends Throwable>, DecayCounter> failureCountByType = new HashMap<>();
Expand All @@ -386,6 +421,7 @@ public void recordFailure(Exception exception)
{
recentFailures.add(1);
lastResponseTime.set(new DateTime());
lastFailureException.set(exception);

Throwable cause = exception;
while (cause.getClass() == RuntimeException.class && cause.getCause() != null) {
Expand Down Expand Up @@ -450,6 +486,12 @@ public DateTime getLastResponseTime()
return lastResponseTime.get();
}

@JsonProperty
public Exception getLastFailureException()
{
return lastFailureException.get();
}

@JsonProperty
public synchronized Map<String, Double> getRecentFailuresByType()
{
Expand Down
Loading