From 52ab49e72559a70af910bf61e33eb02bb245ee46 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Fri, 30 Aug 2024 16:57:14 +0200 Subject: [PATCH] Fix BQ storage stream split (#32376) A variable shadowing was introduced, preventing modification of the stream reader source after splitting. --- .../sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java index e583bdb48178..adc0933defed 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java @@ -347,7 +347,7 @@ public synchronized BigQueryStorageStreamSource getCurrentSource() { // Because superclass cannot have preconditions around these variables, cannot use // @RequiresNonNull Preconditions.checkStateNotNull(responseStream); - BigQueryServerStream responseStream = this.responseStream; + final BigQueryServerStream responseStream = this.responseStream; totalSplitCalls.inc(); LOG.debug( "Received BigQuery Storage API split request for stream {} at fraction {}.", @@ -433,9 +433,9 @@ public synchronized BigQueryStorageStreamSource getCurrentSource() { // Cancels the parent stream before replacing it with the primary stream. responseStream.cancel(); - source = source.fromExisting(splitResponse.getPrimaryStream()); - responseStream = newResponseStream; - responseIterator = newResponseIterator; + this.source = source.fromExisting(splitResponse.getPrimaryStream()); + this.responseStream = newResponseStream; + this.responseIterator = newResponseIterator; reader.resetBuffer(); }