From bd39e7bdcfacfef5bb64e2132ba5d2abf50ce99b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Sat, 30 Sep 2017 09:39:28 +0200 Subject: [PATCH] Fix code style issues for HBaseIO --- .../org/apache/beam/sdk/io/hbase/HBaseIO.java | 200 ++++++++---------- .../apache/beam/sdk/io/hbase/HBaseIOTest.java | 14 +- 2 files changed, 95 insertions(+), 119 deletions(-) diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java index 393402a53e6a8..bcdaefa14986e 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java @@ -159,69 +159,53 @@ public static Read read() { return new Read(null, "", new SerializableScan(new Scan())); } - /** - * A {@link PTransform} that reads from HBase. See the class-level Javadoc on - {@link HBaseIO} for* more information. - * - * @see HBaseIO - */ - public static class Read extends PTransform> { - /** - Reads from the HBase instance - indicated by the* given configuration.*/ - - public Read withConfiguration(Configuration configuration) { - checkArgument(configuration != null, "configuration can not be null"); - return new Read(new SerializableConfiguration(configuration), - tableId, serializableScan); - } - - /** - Reads from the specified table.*/ - - public Read withTableId(String tableId) { - checkArgument(tableId != null, "tableIdcan not be null"); - return new Read(serializableConfiguration, tableId, serializableScan); - } - - /** - Filters the rows read from HBase - using the given* scan.*/ - - public Read withScan(Scan scan) { - checkArgument(scan != null, "scancan not be null"); - return new Read(serializableConfiguration, tableId, new SerializableScan(scan)); - } - - /** - Filters the rows read from HBase - using the given* row filter.*/ + /** + * A {@link PTransform} that reads from HBase. See the class-level Javadoc on {@link HBaseIO} for* + * more information. + * + * @see HBaseIO + */ + public static class Read extends PTransform> { + /** Reads from the HBase instance indicated by the* given configuration. */ + public Read withConfiguration(Configuration configuration) { + checkArgument(configuration != null, "configuration can not be null"); + return new Read(new SerializableConfiguration(configuration), tableId, serializableScan); + } - public Read withFilter(Filter filter) { - checkArgument(filter != null, "filtercan not be null"); - return withScan(serializableScan.get().setFilter(filter)); - } + /** Reads from the specified table. */ + public Read withTableId(String tableId) { + checkArgument(tableId != null, "tableIdcan not be null"); + return new Read(serializableConfiguration, tableId, serializableScan); + } - /** - Reads only rows in the specified range.*/ + /** Filters the rows read from HBase using the given* scan. */ + public Read withScan(Scan scan) { + checkArgument(scan != null, "scancan not be null"); + return new Read(serializableConfiguration, tableId, new SerializableScan(scan)); + } - public Read withKeyRange(ByteKeyRange keyRange) { - checkArgument(keyRange != null, "keyRangecan not be null"); - byte[] startRow = keyRange.getStartKey().getBytes(); - byte[] stopRow = keyRange.getEndKey().getBytes(); - return withScan(serializableScan.get().setStartRow(startRow).setStopRow(stopRow)); - } + /** Filters the rows read from HBase using the given* row filter. */ + public Read withFilter(Filter filter) { + checkArgument(filter != null, "filtercan not be null"); + return withScan(serializableScan.get().setFilter(filter)); + } - /** - Reads only rows in the specified range.*/ + /** Reads only rows in the specified range. */ + public Read withKeyRange(ByteKeyRange keyRange) { + checkArgument(keyRange != null, "keyRangecan not be null"); + byte[] startRow = keyRange.getStartKey().getBytes(); + byte[] stopRow = keyRange.getEndKey().getBytes(); + return withScan(serializableScan.get().setStartRow(startRow).setStopRow(stopRow)); + } - public Read withKeyRange(byte[] startRow, byte[] stopRow) { - checkArgument(startRow != null, "startRowcan not be null"); - checkArgument(stopRow != null, "stopRowcan not be null"); - ByteKeyRange keyRange = - ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow)); - return withKeyRange(keyRange); - } + /** Reads only rows in the specified range. */ + public Read withKeyRange(byte[] startRow, byte[] stopRow) { + checkArgument(startRow != null, "startRowcan not be null"); + checkArgument(stopRow != null, "stopRowcan not be null"); + ByteKeyRange keyRange = + ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow)); + return withKeyRange(keyRange); + } private Read( SerializableConfiguration serializableConfiguration, @@ -232,22 +216,21 @@ private Read( this.serializableScan = serializableScan; } - @Override - public PCollection expand(PBegin input) { - checkArgument(serializableConfiguration != null, - "withConfiguration() is required"); - checkArgument(!tableId.isEmpty(), "withTableId() is required"); - try (Connection connection = ConnectionFactory.createConnection( - serializableConfiguration.get())) { - Admin admin = connection.getAdmin(); - checkArgument(admin.tableExists(TableName.valueOf(tableId)), - "Table %s does not exist", tableId); - } catch (IOException e) { - LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e); - } - HBaseSource source = new HBaseSource(this, null /* estimatedSizeBytes */); - return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source)); - } + @Override + public PCollection expand(PBegin input) { + checkArgument(serializableConfiguration != null, "withConfiguration() is required"); + checkArgument(!tableId.isEmpty(), "withTableId() is required"); + try (Connection connection = + ConnectionFactory.createConnection(serializableConfiguration.get())) { + Admin admin = connection.getAdmin(); + checkArgument( + admin.tableExists(TableName.valueOf(tableId)), "Table %s does not exist", tableId); + } catch (IOException e) { + LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e); + } + HBaseSource source = new HBaseSource(this, null /* estimatedSizeBytes */); + return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source)); + } @Override public void populateDisplayData(DisplayData.Builder builder) { @@ -597,50 +580,45 @@ public static Write write() { return new Write(null /* SerializableConfiguration */, ""); } - /** - * A {@link PTransform} that writes to HBase. See the class-level Javadoc on - {@link HBaseIO} for* more information. - * - * @see HBaseIO - */ - public static class Write extends PTransform, PDone> { - /** - Writes to the HBase instance - indicated by the* given Configuration. - */ - public Write withConfiguration(Configuration configuration) { - checkArgument(configuration != null, "configuration can not be null"); - return new Write(new SerializableConfiguration(configuration), tableId); - } - - /** - Writes to the specified table.*/ + /** + * A {@link PTransform} that writes to HBase. See the class-level Javadoc on {@link HBaseIO} for* + * more information. + * + * @see HBaseIO + */ + public static class Write extends PTransform, PDone> { + /** Writes to the HBase instance indicated by the* given Configuration. */ + public Write withConfiguration(Configuration configuration) { + checkArgument(configuration != null, "configuration can not be null"); + return new Write(new SerializableConfiguration(configuration), tableId); + } - public Write withTableId(String tableId) { - checkArgument(tableId != null, "tableIdcan not be null"); - return new Write(serializableConfiguration, tableId); - } + /** Writes to the specified table. */ + public Write withTableId(String tableId) { + checkArgument(tableId != null, "tableIdcan not be null"); + return new Write(serializableConfiguration, tableId); + } private Write(SerializableConfiguration serializableConfiguration, String tableId) { this.serializableConfiguration = serializableConfiguration; this.tableId = tableId; } - @Override - public PDone expand(PCollection input) { - checkArgument(serializableConfiguration != null, "withConfiguration() is required"); - checkArgument(tableId != null && !tableId.isEmpty(), "withTableId() is required"); - try (Connection connection = ConnectionFactory.createConnection( - serializableConfiguration.get())) { - Admin admin = connection.getAdmin(); - checkArgument(admin.tableExists(TableName.valueOf(tableId)), - "Table %s does not exist", tableId); - } catch (IOException e) { - LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e); - } - input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration))); - return PDone.in(input.getPipeline()); - } + @Override + public PDone expand(PCollection input) { + checkArgument(serializableConfiguration != null, "withConfiguration() is required"); + checkArgument(tableId != null && !tableId.isEmpty(), "withTableId() is required"); + try (Connection connection = + ConnectionFactory.createConnection(serializableConfiguration.get())) { + Admin admin = connection.getAdmin(); + checkArgument( + admin.tableExists(TableName.valueOf(tableId)), "Table %s does not exist", tableId); + } catch (IOException e) { + LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e); + } + input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration))); + return PDone.in(input.getPipeline()); + } @Override public void populateDisplayData(DisplayData.Builder builder) { diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java index 73ba64be9e4e2..fd420249412da 100644 --- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java @@ -355,14 +355,12 @@ public void testWriting() throws Exception { public void testWritingFailsTableDoesNotExist() throws Exception { final String table = "TEST-TABLE-DOES-NOT-EXIST"; - - - // Exception will be thrown by write.expand() when write is applied. - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage(String.format("Table %s does not exist", table)); - p.apply(Create.empty(HBaseMutationCoder.of())) - .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table)); - } + // Exception will be thrown by write.expand() when write is applied. + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(String.format("Table %s does not exist", table)); + p.apply(Create.empty(HBaseMutationCoder.of())) + .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table)); + } /** Tests that when writing an element fails, the write fails. */ @Test