From d49ed1c60dea9c1f3fbb99ad0c00b32ac8c8dfaa Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Wed, 13 Oct 2021 08:46:00 +0300 Subject: [PATCH] Allow getting metrics after the operator is closed In case the metrics are updated when close() is called, the metrics' handling code would be simpler. --- .../WorkProcessorPipelineSourceOperator.java | 5 ++++ .../WorkProcessorSourceOperatorAdapter.java | 2 +- ...stWorkProcessorPipelineSourceOperator.java | 25 ++++++++++++++----- .../spi/connector/ConnectorPageSource.java | 1 + .../memory/MemoryPageSourceProvider.java | 4 ++- 5 files changed, 29 insertions(+), 8 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/operator/WorkProcessorPipelineSourceOperator.java b/core/trino-main/src/main/java/io/trino/operator/WorkProcessorPipelineSourceOperator.java index 98522e50deb9..e276e48e4f17 100644 --- a/core/trino-main/src/main/java/io/trino/operator/WorkProcessorPipelineSourceOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/WorkProcessorPipelineSourceOperator.java @@ -538,6 +538,11 @@ private void closeOperators(int lastOperatorIndex) operatorContext.getDriverContext().getTaskId()); } finally { + workProcessorOperatorContext.metrics.set(operator.getMetrics()); + if (operator instanceof WorkProcessorSourceOperator) { + WorkProcessorSourceOperator sourceOperator = (WorkProcessorSourceOperator) operator; + workProcessorOperatorContext.connectorMetrics.set(sourceOperator.getConnectorMetrics()); + } workProcessorOperatorContext.memoryTrackingContext.close(); workProcessorOperatorContext.finalOperatorInfo = operator.getOperatorInfo().orElse(null); workProcessorOperatorContext.operator = null; diff --git a/core/trino-main/src/main/java/io/trino/operator/WorkProcessorSourceOperatorAdapter.java b/core/trino-main/src/main/java/io/trino/operator/WorkProcessorSourceOperatorAdapter.java index 683403f0cc9a..b70cc6d27562 100644 --- a/core/trino-main/src/main/java/io/trino/operator/WorkProcessorSourceOperatorAdapter.java +++ b/core/trino-main/src/main/java/io/trino/operator/WorkProcessorSourceOperatorAdapter.java @@ -176,9 +176,9 @@ public boolean isFinished() public void close() throws Exception { + sourceOperator.close(); operatorContext.setLatestMetrics(sourceOperator.getMetrics()); operatorContext.setLatestConnectorMetrics(sourceOperator.getConnectorMetrics()); - sourceOperator.close(); } private void updateOperatorStats() diff --git a/core/trino-main/src/test/java/io/trino/operator/TestWorkProcessorPipelineSourceOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestWorkProcessorPipelineSourceOperator.java index aeb29ea2391a..e696d042b702 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestWorkProcessorPipelineSourceOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestWorkProcessorPipelineSourceOperator.java @@ -208,8 +208,12 @@ public void testWorkProcessorPipelineSourceOperator() // assert source operator stats are correct OperatorStats sourceOperatorStats = operatorStats.get(0); - assertEquals(sourceOperatorStats.getMetrics().getMetrics(), ImmutableMap.of("testSourceMetric", new LongCount(1))); - assertEquals(sourceOperatorStats.getConnectorMetrics().getMetrics(), ImmutableMap.of("testSourceConnectorMetric", new LongCount(2))); + assertEquals(sourceOperatorStats.getMetrics().getMetrics(), ImmutableMap.of( + "testSourceMetric", new LongCount(1), + "testSourceClosed", new LongCount(1))); + assertEquals(sourceOperatorStats.getConnectorMetrics().getMetrics(), ImmutableMap.of( + "testSourceConnectorMetric", new LongCount(2), + "testSourceConnectorClosed", new LongCount(1))); assertEquals(sourceOperatorStats.getDynamicFilterSplitsProcessed(), 42L); @@ -239,8 +243,12 @@ public void testWorkProcessorPipelineSourceOperator() // assert pipeline metrics List operatorSummaries = pipelineStats.getOperatorSummaries(); - assertEquals(operatorSummaries.get(0).getMetrics().getMetrics(), ImmutableMap.of("testSourceMetric", new LongCount(1))); - assertEquals(operatorSummaries.get(0).getConnectorMetrics().getMetrics(), ImmutableMap.of("testSourceConnectorMetric", new LongCount(2))); + assertEquals(operatorSummaries.get(0).getMetrics().getMetrics(), ImmutableMap.of( + "testSourceMetric", new LongCount(1), + "testSourceClosed", new LongCount(1))); + assertEquals(operatorSummaries.get(0).getConnectorMetrics().getMetrics(), ImmutableMap.of( + "testSourceConnectorMetric", new LongCount(2), + "testSourceConnectorClosed", new LongCount(1))); assertEquals(operatorSummaries.get(1).getMetrics().getMetrics(), ImmutableMap.of("testOperatorMetric", new LongCount(1))); } @@ -425,13 +433,18 @@ public long getDynamicFilterSplitsProcessed() @Override public Metrics getMetrics() { - return new Metrics(ImmutableMap.of("testSourceMetric", new LongCount(1))); + System.err.println("closed: " + closed); + return new Metrics(ImmutableMap.of( + "testSourceMetric", new LongCount(1), + "testSourceClosed", new LongCount(closed ? 1 : 0))); } @Override public Metrics getConnectorMetrics() { - return new Metrics(ImmutableMap.of("testSourceConnectorMetric", new LongCount(2))); + return new Metrics(ImmutableMap.of( + "testSourceConnectorMetric", new LongCount(2), + "testSourceConnectorClosed", new LongCount(closed ? 1 : 0))); } @Override diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorPageSource.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorPageSource.java index c9ce8bb12d7c..995562cb6c22 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorPageSource.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorPageSource.java @@ -87,6 +87,7 @@ default CompletableFuture isBlocked() * Returns the connector's metrics, mapping a metric ID to its latest value. * Each call must return an immutable snapshot of available metrics. * Same ID metrics are merged across all tasks and exposed via OperatorStats. + * This method can be called after the page source is closed. */ default Metrics getMetrics() { diff --git a/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemoryPageSourceProvider.java b/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemoryPageSourceProvider.java index 6cced43fa469..ee4ce4c3ed45 100644 --- a/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemoryPageSourceProvider.java +++ b/plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemoryPageSourceProvider.java @@ -95,6 +95,7 @@ private static class DynamicFilteringPageSource private final boolean enableLazyDynamicFiltering; private long rows; private long completedPositions; + private boolean closed; private DynamicFilteringPageSource(FixedPageSource delegate, List columns, DynamicFilter dynamicFilter, boolean enableLazyDynamicFiltering) { @@ -171,6 +172,7 @@ public long getSystemMemoryUsage() public void close() { delegate.close(); + closed = true; } @Override @@ -178,7 +180,7 @@ public Metrics getMetrics() { return new Metrics(ImmutableMap.of( "rows", new LongCount(rows), - "finished", new LongCount(isFinished() ? 1 : 0), + "finished", new LongCount(closed ? 1 : 0), "started", new LongCount(1))); } }