diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroBucketMetadata.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroBucketMetadata.java index e71dbee0b7..f568422942 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroBucketMetadata.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/AvroBucketMetadata.java @@ -135,12 +135,12 @@ public Map, Coder> coderOverrides() { } @Override - public int hashPrimaryKeyMetadata() { + int hashPrimaryKeyMetadata() { return Objects.hash(keyField, getKeyClass()); } @Override - public int hashSecondaryKeyMetadata() { + int hashSecondaryKeyMetadata() { return Objects.hash(keyFieldSecondary, getKeyClassSecondary()); } diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadata.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadata.java index 9749bfb768..10299f22df 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadata.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/BucketMetadata.java @@ -327,9 +327,9 @@ public Set> compatibleMetadataTypes() { public abstract K2 extractKeySecondary(V value); - public abstract int hashPrimaryKeyMetadata(); + abstract int hashPrimaryKeyMetadata(); - public abstract int hashSecondaryKeyMetadata(); + abstract int hashSecondaryKeyMetadata(); public SortedBucketIO.ComparableKeyBytes primaryComparableKeyBytes(V value) { return new SortedBucketIO.ComparableKeyBytes(getKeyBytesPrimary(value), null); diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonBucketMetadata.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonBucketMetadata.java index 4b41a3bf6a..d986cf09be 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonBucketMetadata.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/JsonBucketMetadata.java @@ -148,12 +148,12 @@ public void populateDisplayData(Builder builder) { } @Override - public int hashPrimaryKeyMetadata() { + int hashPrimaryKeyMetadata() { return Objects.hash(keyField, getKeyClass()); } @Override - public int hashSecondaryKeyMetadata() { + int hashSecondaryKeyMetadata() { return Objects.hash(keyFieldSecondary, getKeyClassSecondary()); } } diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetBucketMetadata.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetBucketMetadata.java index 2c828efb02..dbe2aeeeb9 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetBucketMetadata.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetBucketMetadata.java @@ -192,12 +192,12 @@ public void populateDisplayData(DisplayData.Builder builder) { } @Override - public int hashPrimaryKeyMetadata() { + int hashPrimaryKeyMetadata() { return Objects.hash(keyField, getKeyClass()); } @Override - public int hashSecondaryKeyMetadata() { + int hashSecondaryKeyMetadata() { return Objects.hash(keyFieldSecondary, getKeyClassSecondary()); } diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java index bbd1924cce..2db687bb06 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/SortedBucketSource.java @@ -495,7 +495,9 @@ public Predicate getPredicate() { } public Coder getCoder() { - return directories.entrySet().iterator().next().getValue().getValue().getCoder(); + final KV> sampledSource = + directories.entrySet().iterator().next().getValue(); + return sampledSource.getValue().getCoder(); } static CoGbkResultSchema schemaOf(List> sources) { @@ -523,13 +525,13 @@ long getOrSampleByteSize() { entry -> { // Take at most 10 buckets from the directory to sample // Check for single-shard filenames template first, then multi-shard + final String filenameSuffix = entry.getValue().getKey(); List sampledFiles = - sampleDirectory(entry.getKey(), "*-0000?-of-?????" + entry.getValue().getKey()); + sampleDirectory(entry.getKey(), "*-0000?-of-?????" + filenameSuffix); if (sampledFiles.isEmpty()) { sampledFiles = sampleDirectory( - entry.getKey(), - "*-0000?-of-*-shard-00000-of-?????" + entry.getValue().getKey()); + entry.getKey(), "*-0000?-of-*-shard-00000-of-?????" + filenameSuffix); } int numBuckets = 0; diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/TensorFlowBucketMetadata.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/TensorFlowBucketMetadata.java index 0611b4288d..8b5255bb59 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/TensorFlowBucketMetadata.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/TensorFlowBucketMetadata.java @@ -159,12 +159,12 @@ public void populateDisplayData(Builder builder) { } @Override - public int hashPrimaryKeyMetadata() { + int hashPrimaryKeyMetadata() { return Objects.hash(keyField, getKeyClass()); } @Override - public int hashSecondaryKeyMetadata() { + int hashSecondaryKeyMetadata() { return Objects.hash(keyFieldSecondary, getKeyClassSecondary()); } } diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/BucketMetadataTest.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/BucketMetadataTest.java index 0396c4b26e..0e34b19f8e 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/BucketMetadataTest.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/BucketMetadataTest.java @@ -75,12 +75,12 @@ public Void extractKeySecondary(Object value) { } @Override - public int hashPrimaryKeyMetadata() { + int hashPrimaryKeyMetadata() { return -1; } @Override - public int hashSecondaryKeyMetadata() { + int hashSecondaryKeyMetadata() { return -1; } }); diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TestBucketMetadata.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TestBucketMetadata.java index 6dc01e0edb..106c98c16b 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TestBucketMetadata.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TestBucketMetadata.java @@ -99,12 +99,12 @@ public Void extractKeySecondary(final String value) { } @Override - public int hashPrimaryKeyMetadata() { + int hashPrimaryKeyMetadata() { return Objects.hash(getClass(), keyIndex); } @Override - public int hashSecondaryKeyMetadata() { + int hashSecondaryKeyMetadata() { throw new IllegalArgumentException(); } } diff --git a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TestBucketMetadataWithSecondary.java b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TestBucketMetadataWithSecondary.java index 8a4d02b4d6..d2a0bb9eb0 100644 --- a/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TestBucketMetadataWithSecondary.java +++ b/scio-smb/src/test/java/org/apache/beam/sdk/extensions/smb/TestBucketMetadataWithSecondary.java @@ -97,12 +97,12 @@ public String extractKeySecondary(final String value) { } @Override - public int hashPrimaryKeyMetadata() { + int hashPrimaryKeyMetadata() { return Objects.hash(keyIndex); } @Override - public int hashSecondaryKeyMetadata() { + int hashSecondaryKeyMetadata() { return Objects.hash(keyIndexSecondary); } }