Skip to content

Commit

Permalink
Add anonymized query plan in json format to QueryCompletedEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
gaurav8297 authored and sopel39 committed Aug 26, 2022
1 parent 240ed0a commit 7f30476
Show file tree
Hide file tree
Showing 14 changed files with 214 additions and 14 deletions.
41 changes: 34 additions & 7 deletions core/trino-main/src/main/java/io/trino/event/QueryMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.PlanVisitor;
import io.trino.sql.planner.planprinter.Anonymizer;
import io.trino.sql.planner.planprinter.CounterBasedAnonymizer;
import io.trino.sql.planner.planprinter.NoOpAnonymizer;
import io.trino.sql.planner.planprinter.ValuePrinter;
import io.trino.transaction.TransactionId;
Expand All @@ -87,6 +89,7 @@
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.execution.QueryState.QUEUED;
import static io.trino.execution.StageInfo.getAllStages;
import static io.trino.sql.planner.planprinter.PlanPrinter.jsonDistributedPlan;
import static io.trino.sql.planner.planprinter.PlanPrinter.textDistributedPlan;
import static java.lang.Math.max;
import static java.lang.Math.toIntExact;
Expand Down Expand Up @@ -160,12 +163,13 @@ public void queryCreatedEvent(BasicQueryInfo queryInfo)
ImmutableList.of(),
queryInfo.getSelf(),
Optional.empty(),
Optional.empty(),
Optional.empty())));
}

public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailureInfo failure)
{
eventListenerManager.queryCompleted(new QueryCompletedEvent(
eventListenerManager.queryCompleted(requiresAnonymizedPlan -> new QueryCompletedEvent(
new QueryMetadata(
queryInfo.getQueryId().toString(),
queryInfo.getSession().getTransactionId().map(TransactionId::toString),
Expand All @@ -177,6 +181,7 @@ public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailur
ImmutableList.of(),
queryInfo.getSelf(),
Optional.empty(),
Optional.empty(),
Optional.empty()),
new QueryStatistics(
ofMillis(0),
Expand Down Expand Up @@ -234,9 +239,9 @@ public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailur
public void queryCompletedEvent(QueryInfo queryInfo)
{
QueryStats queryStats = queryInfo.getQueryStats();
eventListenerManager.queryCompleted(
eventListenerManager.queryCompleted(requiresAnonymizedPlan ->
new QueryCompletedEvent(
createQueryMetadata(queryInfo),
createQueryMetadata(queryInfo, requiresAnonymizedPlan),
createQueryStatistics(queryInfo),
createQueryContext(
queryInfo.getSession(),
Expand All @@ -253,8 +258,9 @@ public void queryCompletedEvent(QueryInfo queryInfo)
logQueryTimeline(queryInfo);
}

private QueryMetadata createQueryMetadata(QueryInfo queryInfo)
private QueryMetadata createQueryMetadata(QueryInfo queryInfo, boolean requiresAnonymizedPlan)
{
Anonymizer anonymizer = requiresAnonymizedPlan ? new CounterBasedAnonymizer() : new NoOpAnonymizer();
return new QueryMetadata(
queryInfo.getQueryId().toString(),
queryInfo.getSession().getTransactionId().map(TransactionId::toString),
Expand All @@ -265,7 +271,8 @@ private QueryMetadata createQueryMetadata(QueryInfo queryInfo)
queryInfo.getReferencedTables(),
queryInfo.getRoutines(),
queryInfo.getSelf(),
createTextQueryPlan(queryInfo),
createTextQueryPlan(queryInfo, anonymizer),
createJsonQueryPlan(queryInfo, anonymizer),
queryInfo.getOutputStage().flatMap(stage -> stageInfoCodec.toJsonWithLengthLimit(stage, maxJsonLimit)));
}

Expand Down Expand Up @@ -346,7 +353,7 @@ private QueryContext createQueryContext(SessionRepresentation session, Optional<
retryPolicy.toString());
}

private Optional<String> createTextQueryPlan(QueryInfo queryInfo)
private Optional<String> createTextQueryPlan(QueryInfo queryInfo, Anonymizer anonymizer)
{
try {
if (queryInfo.getOutputStage().isPresent()) {
Expand All @@ -355,7 +362,7 @@ private Optional<String> createTextQueryPlan(QueryInfo queryInfo)
queryInfo.getQueryStats(),
new ValuePrinter(metadata, functionManager, queryInfo.getSession().toSession(sessionPropertyManager)),
false,
new NoOpAnonymizer()));
anonymizer));
}
}
catch (Exception e) {
Expand All @@ -366,6 +373,26 @@ private Optional<String> createTextQueryPlan(QueryInfo queryInfo)
return Optional.empty();
}

private Optional<String> createJsonQueryPlan(QueryInfo queryInfo, Anonymizer anonymizer)
{
try {
if (queryInfo.getOutputStage().isPresent()) {
return Optional.of(jsonDistributedPlan(
queryInfo.getOutputStage().get(),
queryInfo.getSession().toSession(sessionPropertyManager),
metadata,
functionManager,
anonymizer));
}
}
catch (Exception e) {
// Sometimes it is expected to fail. For example if generated plan is too long.
// Don't fail to create event if the plan cannot be created.
log.warn(e, "Error creating anonymized json plan for query %s", queryInfo.getQueryId());
}
return Optional.empty();
}

private static QueryIOMetadata getQueryIOMetadata(QueryInfo queryInfo)
{
Multimap<FragmentNode, OperatorStats> planNodeStats = extractPlanNodeStats(queryInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.eventlistener;

import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.airlift.log.Logger;
Expand Down Expand Up @@ -131,14 +132,15 @@ private static Map<String, String> loadEventListenerProperties(File configFile)
}
}

public void queryCompleted(QueryCompletedEvent queryCompletedEvent)
public void queryCompleted(Function<Boolean, QueryCompletedEvent> queryCompletedEventProvider)
{
for (EventListener listener : configuredEventListeners.get()) {
QueryCompletedEvent event = queryCompletedEventProvider.apply(listener.requiresAnonymizedPlan());
try {
listener.queryCompleted(queryCompletedEvent);
listener.queryCompleted(event);
}
catch (Throwable e) {
log.warn(e, "Failed to publish QueryCompletedEvent for query %s", queryCompletedEvent.getMetadata().getQueryId());
log.warn(e, "Failed to publish QueryCompletedEvent for query %s", event.getMetadata().getQueryId());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public String render(PlanRepresentation plan)
return CODEC.toJson(renderJson(plan, plan.getRoot()));
}

private JsonRenderedNode renderJson(PlanRepresentation plan, NodeRepresentation node)
protected JsonRenderedNode renderJson(PlanRepresentation plan, NodeRepresentation node)
{
List<JsonRenderedNode> children = node.getChildren().stream()
.map(plan::getNode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Streams;
import io.airlift.json.JsonCodec;
import io.airlift.units.Duration;
import io.trino.Session;
import io.trino.cost.PlanCostEstimate;
Expand Down Expand Up @@ -145,13 +146,15 @@
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.airlift.json.JsonCodec.mapJsonCodec;
import static io.trino.execution.StageInfo.getAllStages;
import static io.trino.metadata.ResolvedFunction.extractFunctionName;
import static io.trino.server.DynamicFilterService.DynamicFilterDomainStats;
import static io.trino.sql.DynamicFilters.extractDynamicFilters;
import static io.trino.sql.ExpressionUtils.combineConjunctsWithDuplicates;
import static io.trino.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION;
import static io.trino.sql.planner.plan.JoinNode.Type.INNER;
import static io.trino.sql.planner.planprinter.JsonRenderer.JsonRenderedNode;
import static io.trino.sql.planner.planprinter.PlanNodeStatsSummarizer.aggregateStageStats;
import static io.trino.sql.planner.planprinter.TextRenderer.formatDouble;
import static io.trino.sql.planner.planprinter.TextRenderer.formatPositions;
Expand All @@ -169,6 +172,9 @@

public class PlanPrinter
{
private static final JsonCodec<Map<PlanFragmentId, JsonRenderedNode>> DISTRIBUTED_PLAN_CODEC =
mapJsonCodec(PlanFragmentId.class, JsonRenderedNode.class);

private final PlanRepresentation representation;
private final Function<TableScanNode, TableInfo> tableInfoSupplier;
private final Map<DynamicFilterId, DynamicFilterDomainStats> dynamicFilterDomainStats;
Expand Down Expand Up @@ -230,6 +236,11 @@ String toJson()
return new JsonRenderer().render(representation);
}

JsonRenderedNode toJsonRenderedNode()
{
return new JsonRenderer().renderJson(representation, representation.getRoot());
}

public static String jsonFragmentPlan(PlanNode root, Map<Symbol, Type> symbols, Metadata metadata, FunctionManager functionManager, Session session)
{
TypeProvider typeProvider = TypeProvider.copyOf(symbols.entrySet().stream()
Expand Down Expand Up @@ -272,6 +283,43 @@ public static String jsonLogicalPlan(
.toJson();
}

public static String jsonDistributedPlan(
StageInfo outputStageInfo,
Session session,
Metadata metadata,
FunctionManager functionManager,
Anonymizer anonymizer)
{
List<StageInfo> allStages = getAllStages(Optional.of(outputStageInfo));
TypeProvider types = getTypeProvider(allStages.stream()
.map(StageInfo::getPlan)
.collect(toImmutableList()));
Map<PlanNodeId, TableInfo> tableInfos = allStages.stream()
.map(StageInfo::getTables)
.map(Map::entrySet)
.flatMap(Collection::stream)
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));

ValuePrinter valuePrinter = new ValuePrinter(metadata, functionManager, session);

Map<PlanFragmentId, JsonRenderedNode> anonymizedPlan = allStages.stream()
.map(StageInfo::getPlan)
.filter(Objects::nonNull)
.collect(toImmutableMap(
PlanFragment::getId,
planFragment -> new PlanPrinter(
planFragment.getRoot(),
types,
tableScanNode -> tableInfos.get(tableScanNode.getId()),
ImmutableMap.of(),
valuePrinter,
planFragment.getStatsAndCosts(),
Optional.empty(),
anonymizer)
.toJsonRenderedNode()));
return DISTRIBUTED_PLAN_CODEC.toJson(anonymizedPlan);
}

public static String textLogicalPlan(
PlanNode plan,
TypeProvider types,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.testing;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.trino.eventlistener.EventListenerConfig;
Expand Down Expand Up @@ -57,10 +58,10 @@ public void addEventListener(EventListener eventListener)
}

@Override
public void queryCompleted(QueryCompletedEvent queryCompletedEvent)
public void queryCompleted(Function<Boolean, QueryCompletedEvent> queryCompletedEventProvider)
{
for (EventListener listener : configuredEventListeners) {
listener.queryCompleted(queryCompletedEvent);
listener.queryCompleted(queryCompletedEventProvider.apply(listener.requiresAnonymizedPlan()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,12 @@ default void queryCompleted(QueryCompletedEvent queryCompletedEvent)
default void splitCompleted(SplitCompletedEvent splitCompletedEvent)
{
}

/**
* Specify whether the plan included in QueryCompletedEvent should be anonymized or not
*/
default boolean requiresAnonymizedPlan()
{
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class QueryMetadata
private final List<RoutineInfo> routines;

private final Optional<String> plan;
private final Optional<String> jsonPlan;

private final Optional<String> payload;

Expand All @@ -56,6 +57,7 @@ public QueryMetadata(
List<RoutineInfo> routines,
URI uri,
Optional<String> plan,
Optional<String> jsonPlan,
Optional<String> payload)
{
this.queryId = requireNonNull(queryId, "queryId is null");
Expand All @@ -68,6 +70,7 @@ public QueryMetadata(
this.routines = requireNonNull(routines, "routines is null");
this.uri = requireNonNull(uri, "uri is null");
this.plan = requireNonNull(plan, "plan is null");
this.jsonPlan = requireNonNull(jsonPlan, "jsonPlan is null");
this.payload = requireNonNull(payload, "payload is null");
}

Expand Down Expand Up @@ -131,6 +134,12 @@ public Optional<String> getPlan()
return plan;
}

@JsonProperty
public Optional<String> getJsonPlan()
{
return jsonPlan;
}

@JsonProperty
public Optional<String> getPayload()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class TestSpiBackwardCompatibility
.put("393", "Method: public abstract java.util.function.ToIntFunction<io.trino.spi.connector.ConnectorSplit> io.trino.spi.connector.ConnectorNodePartitioningProvider.getSplitBucketFunction(io.trino.spi.connector.ConnectorTransactionHandle,io.trino.spi.connector.ConnectorSession,io.trino.spi.connector.ConnectorPartitioningHandle)")
.put("394", "Method: public abstract io.trino.spi.exchange.Exchange io.trino.spi.exchange.ExchangeManager.createExchange(io.trino.spi.exchange.ExchangeContext,int)")
.put("394", "Method: public abstract io.trino.spi.exchange.ExchangeSink io.trino.spi.exchange.ExchangeManager.createSink(io.trino.spi.exchange.ExchangeSinkInstanceHandle,boolean)")
.put("394", "Constructor: public io.trino.spi.eventlistener.QueryMetadata(java.lang.String,java.util.Optional<java.lang.String>,java.lang.String,java.util.Optional<java.lang.String>,java.util.Optional<java.lang.String>,java.lang.String,java.util.List<io.trino.spi.eventlistener.TableInfo>,java.util.List<io.trino.spi.eventlistener.RoutineInfo>,java.net.URI,java.util.Optional<java.lang.String>,java.util.Optional<java.lang.String>)")
.build();

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,12 @@ public void splitCompleted(SplitCompletedEvent splitCompletedEvent)
delegate.splitCompleted(splitCompletedEvent);
}
}

@Override
public boolean requiresAnonymizedPlan()
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.requiresAnonymizedPlan();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ public class TestHttpEventListener
List.of(),
List.of(),
URI.create("http://localhost"),
Optional.empty(), Optional.empty());
Optional.empty(),
Optional.empty(),
Optional.empty());

splitStatistics = new SplitStatistics(
ofMillis(1000),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ MaterializedResultWithEvents runQueryAndWaitForEvents(@Language("SQL") String sq
return runQueryAndWaitForEvents(sql, session, Optional.empty());
}

MaterializedResultWithEvents runQueryAndWaitForEvents(@Language("SQL") String sql, Session session, boolean requireAnonymizedPlan)
throws Exception
{
eventsCollector.setRequiresAnonymizedPlan(requireAnonymizedPlan);
return runQueryAndWaitForEvents(sql, session, Optional.empty());
}

MaterializedResultWithEvents runQueryAndWaitForEvents(@Language("SQL") String sql, Session session, Optional<String> expectedExceptionRegEx)
throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand All @@ -36,6 +37,7 @@
final class EventsCollector
{
private final ConcurrentHashMap<QueryId, QueryEvents> queryEvents = new ConcurrentHashMap<>();
private final AtomicBoolean requiresAnonymizedPlan = new AtomicBoolean(false);

public synchronized void addQueryCreated(QueryCreatedEvent event)
{
Expand All @@ -52,6 +54,16 @@ public synchronized void addSplitCompleted(SplitCompletedEvent event)
getQueryEvents(new QueryId(event.getQueryId())).addSplitCompleted(event);
}

public void setRequiresAnonymizedPlan(boolean value)
{
requiresAnonymizedPlan.set(value);
}

public boolean requiresAnonymizedPlan()
{
return requiresAnonymizedPlan.get();
}

public QueryEvents getQueryEvents(QueryId queryId)
{
return queryEvents.computeIfAbsent(queryId, ignored -> new QueryEvents());
Expand Down
Loading

0 comments on commit 7f30476

Please sign in to comment.