Skip to content

Commit

Permalink
Expose query retry policy via query event listener
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Jun 20, 2022
1 parent 85c6c09 commit bff3e64
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 7 deletions.
24 changes: 19 additions & 5 deletions core/trino-main/src/main/java/io/trino/event/QueryMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.trino.metadata.Metadata;
import io.trino.metadata.SessionPropertyManager;
import io.trino.operator.OperatorStats;
import io.trino.operator.RetryPolicy;
import io.trino.operator.TableFinishInfo;
import io.trino.operator.TaskStats;
import io.trino.server.BasicQueryInfo;
Expand Down Expand Up @@ -140,7 +141,11 @@ public void queryCreatedEvent(BasicQueryInfo queryInfo)
eventListenerManager.queryCreated(
new QueryCreatedEvent(
queryInfo.getQueryStats().getCreateTime().toDate().toInstant(),
createQueryContext(queryInfo.getSession(), queryInfo.getResourceGroupId(), queryInfo.getQueryType()),
createQueryContext(
queryInfo.getSession(),
queryInfo.getResourceGroupId(),
queryInfo.getQueryType(),
queryInfo.getRetryPolicy()),
new QueryMetadata(
queryInfo.getQueryId().toString(),
queryInfo.getSession().getTransactionId().map(TransactionId::toString),
Expand Down Expand Up @@ -208,7 +213,11 @@ public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailur
ImmutableList.of(),
ImmutableList.of(),
Optional.empty()),
createQueryContext(queryInfo.getSession(), queryInfo.getResourceGroupId(), queryInfo.getQueryType()),
createQueryContext(
queryInfo.getSession(),
queryInfo.getResourceGroupId(),
queryInfo.getQueryType(),
queryInfo.getRetryPolicy()),
new QueryIOMetadata(ImmutableList.of(), Optional.empty()),
createQueryFailureInfo(failure, Optional.empty()),
ImmutableList.of(),
Expand All @@ -226,7 +235,11 @@ public void queryCompletedEvent(QueryInfo queryInfo)
new QueryCompletedEvent(
createQueryMetadata(queryInfo),
createQueryStatistics(queryInfo),
createQueryContext(queryInfo.getSession(), queryInfo.getResourceGroupId(), queryInfo.getQueryType()),
createQueryContext(
queryInfo.getSession(),
queryInfo.getResourceGroupId(),
queryInfo.getQueryType(),
queryInfo.getRetryPolicy()),
getQueryIOMetadata(queryInfo),
createQueryFailureInfo(queryInfo.getFailureInfo(), queryInfo.getOutputStage()),
queryInfo.getWarnings(),
Expand Down Expand Up @@ -305,7 +318,7 @@ private QueryStatistics createQueryStatistics(QueryInfo queryInfo)
serializedPlanNodeStatsAndCosts);
}

private QueryContext createQueryContext(SessionRepresentation session, Optional<ResourceGroupId> resourceGroup, Optional<QueryType> queryType)
private QueryContext createQueryContext(SessionRepresentation session, Optional<ResourceGroupId> resourceGroup, Optional<QueryType> queryType, RetryPolicy retryPolicy)
{
return new QueryContext(
session.getUser(),
Expand All @@ -326,7 +339,8 @@ private QueryContext createQueryContext(SessionRepresentation session, Optional<
serverAddress,
serverVersion,
environment,
queryType);
queryType,
retryPolicy.toString());
}

private Optional<String> createTextQueryPlan(QueryInfo queryInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class QueryContext

private final Optional<QueryType> queryType;

private final String retryPolicy;

@JsonCreator
public QueryContext(
String user,
Expand All @@ -75,7 +77,8 @@ public QueryContext(
String serverAddress,
String serverVersion,
String environment,
Optional<QueryType> queryType)
Optional<QueryType> queryType,
String retryPolicy)
{
this.user = requireNonNull(user, "user is null");
this.principal = requireNonNull(principal, "principal is null");
Expand All @@ -96,6 +99,7 @@ public QueryContext(
this.serverVersion = requireNonNull(serverVersion, "serverVersion is null");
this.environment = requireNonNull(environment, "environment is null");
this.queryType = requireNonNull(queryType, "queryType is null");
this.retryPolicy = requireNonNull(retryPolicy, "retryMode is null");
}

@JsonProperty
Expand Down Expand Up @@ -211,4 +215,10 @@ public Optional<QueryType> getQueryType()
{
return queryType;
}

@JsonProperty
public String getRetryPolicy()
{
return retryPolicy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class TestSpiBackwardCompatibility
.put("386", "Method: public default java.util.stream.Stream<io.trino.spi.connector.TableColumnsMetadata> io.trino.spi.connector.ConnectorMetadata.streamTableColumns(io.trino.spi.connector.ConnectorSession,io.trino.spi.connector.SchemaTablePrefix)")
.put("386", "Method: public default boolean io.trino.spi.connector.ConnectorMetadata.isSupportedVersionType(io.trino.spi.connector.ConnectorSession,io.trino.spi.connector.SchemaTableName,io.trino.spi.connector.PointerType,io.trino.spi.type.Type)")
.put("386", "Method: public static io.trino.spi.ptf.TableArgumentSpecification$Builder io.trino.spi.ptf.TableArgumentSpecification.builder(java.lang.String)")
.put("387", "Constructor: public io.trino.spi.eventlistener.QueryContext(java.lang.String,java.util.Optional<java.lang.String>,java.util.Set<java.lang.String>,java.util.Optional<java.lang.String>,java.util.Optional<java.lang.String>,java.util.Optional<java.lang.String>,java.util.Optional<java.lang.String>,java.util.Set<java.lang.String>,java.util.Set<java.lang.String>,java.util.Optional<java.lang.String>,java.util.Optional<java.lang.String>,java.util.Optional<java.lang.String>,java.util.Optional<io.trino.spi.resourcegroups.ResourceGroupId>,java.util.Map<java.lang.String, java.lang.String>,io.trino.spi.session.ResourceEstimates,java.lang.String,java.lang.String,java.lang.String,java.util.Optional<io.trino.spi.resourcegroups.QueryType>)")
.build();

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.airlift.json.JsonCodec;
import io.trino.operator.RetryPolicy;
import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.eventlistener.QueryCompletedEvent;
import io.trino.spi.eventlistener.QueryContext;
Expand Down Expand Up @@ -115,7 +116,8 @@ public class TestHttpEventListener
new HashMap<>(), // sessionProperties
new ResourceEstimates(Optional.empty(), Optional.empty(), Optional.of(1000L)),
"serverAddress", "serverVersion", "environment",
Optional.of(QueryType.SELECT));
Optional.of(QueryType.SELECT),
RetryPolicy.QUERY.toString());

queryMetadata = new QueryMetadata(
"queryId",
Expand Down

0 comments on commit bff3e64

Please sign in to comment.