Skip to content

Commit

Permalink
This closes #3932
Browse files Browse the repository at this point in the history
  • Loading branch information
iemejia committed Oct 3, 2017
2 parents 4a5b3c0 + bd39e7b commit 81d304d
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,69 +159,53 @@ public static Read read() {
return new Read(null, "", new SerializableScan(new Scan()));
}

/**
* A {@link PTransform} that reads from HBase. See the class-level Javadoc on
{@link HBaseIO} for* more information.
*
* @see HBaseIO
*/
public static class Read extends PTransform<PBegin, PCollection<Result>> {
/**
Reads from the HBase instance
indicated by the* given configuration.*/

public Read withConfiguration(Configuration configuration) {
checkArgument(configuration != null, "configuration can not be null");
return new Read(new SerializableConfiguration(configuration),
tableId, serializableScan);
}

/**
Reads from the specified table.*/

public Read withTableId(String tableId) {
checkArgument(tableId != null, "tableIdcan not be null");
return new Read(serializableConfiguration, tableId, serializableScan);
}

/**
Filters the rows read from HBase
using the given* scan.*/

public Read withScan(Scan scan) {
checkArgument(scan != null, "scancan not be null");
return new Read(serializableConfiguration, tableId, new SerializableScan(scan));
}

/**
Filters the rows read from HBase
using the given* row filter.*/
/**
* A {@link PTransform} that reads from HBase. See the class-level Javadoc on {@link HBaseIO} for*
* more information.
*
* @see HBaseIO
*/
public static class Read extends PTransform<PBegin, PCollection<Result>> {
/** Reads from the HBase instance indicated by the* given configuration. */
public Read withConfiguration(Configuration configuration) {
checkArgument(configuration != null, "configuration can not be null");
return new Read(new SerializableConfiguration(configuration), tableId, serializableScan);
}

public Read withFilter(Filter filter) {
checkArgument(filter != null, "filtercan not be null");
return withScan(serializableScan.get().setFilter(filter));
}
/** Reads from the specified table. */
public Read withTableId(String tableId) {
checkArgument(tableId != null, "tableIdcan not be null");
return new Read(serializableConfiguration, tableId, serializableScan);
}

/**
Reads only rows in the specified range.*/
/** Filters the rows read from HBase using the given* scan. */
public Read withScan(Scan scan) {
checkArgument(scan != null, "scancan not be null");
return new Read(serializableConfiguration, tableId, new SerializableScan(scan));
}

public Read withKeyRange(ByteKeyRange keyRange) {
checkArgument(keyRange != null, "keyRangecan not be null");
byte[] startRow = keyRange.getStartKey().getBytes();
byte[] stopRow = keyRange.getEndKey().getBytes();
return withScan(serializableScan.get().setStartRow(startRow).setStopRow(stopRow));
}
/** Filters the rows read from HBase using the given* row filter. */
public Read withFilter(Filter filter) {
checkArgument(filter != null, "filtercan not be null");
return withScan(serializableScan.get().setFilter(filter));
}

/**
Reads only rows in the specified range.*/
/** Reads only rows in the specified range. */
public Read withKeyRange(ByteKeyRange keyRange) {
checkArgument(keyRange != null, "keyRangecan not be null");
byte[] startRow = keyRange.getStartKey().getBytes();
byte[] stopRow = keyRange.getEndKey().getBytes();
return withScan(serializableScan.get().setStartRow(startRow).setStopRow(stopRow));
}

public Read withKeyRange(byte[] startRow, byte[] stopRow) {
checkArgument(startRow != null, "startRowcan not be null");
checkArgument(stopRow != null, "stopRowcan not be null");
ByteKeyRange keyRange =
ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow));
return withKeyRange(keyRange);
}
/** Reads only rows in the specified range. */
public Read withKeyRange(byte[] startRow, byte[] stopRow) {
checkArgument(startRow != null, "startRowcan not be null");
checkArgument(stopRow != null, "stopRowcan not be null");
ByteKeyRange keyRange =
ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow));
return withKeyRange(keyRange);
}

private Read(
SerializableConfiguration serializableConfiguration,
Expand All @@ -232,22 +216,21 @@ private Read(
this.serializableScan = serializableScan;
}

@Override
public PCollection<Result> expand(PBegin input) {
checkArgument(serializableConfiguration != null,
"withConfiguration() is required");
checkArgument(!tableId.isEmpty(), "withTableId() is required");
try (Connection connection = ConnectionFactory.createConnection(
serializableConfiguration.get())) {
Admin admin = connection.getAdmin();
checkArgument(admin.tableExists(TableName.valueOf(tableId)),
"Table %s does not exist", tableId);
} catch (IOException e) {
LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
}
HBaseSource source = new HBaseSource(this, null /* estimatedSizeBytes */);
return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
}
@Override
public PCollection<Result> expand(PBegin input) {
checkArgument(serializableConfiguration != null, "withConfiguration() is required");
checkArgument(!tableId.isEmpty(), "withTableId() is required");
try (Connection connection =
ConnectionFactory.createConnection(serializableConfiguration.get())) {
Admin admin = connection.getAdmin();
checkArgument(
admin.tableExists(TableName.valueOf(tableId)), "Table %s does not exist", tableId);
} catch (IOException e) {
LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
}
HBaseSource source = new HBaseSource(this, null /* estimatedSizeBytes */);
return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
Expand Down Expand Up @@ -597,50 +580,45 @@ public static Write write() {
return new Write(null /* SerializableConfiguration */, "");
}

/**
* A {@link PTransform} that writes to HBase. See the class-level Javadoc on
{@link HBaseIO} for* more information.
*
* @see HBaseIO
*/
public static class Write extends PTransform<PCollection<Mutation>, PDone> {
/**
Writes to the HBase instance
indicated by the* given Configuration.
*/
public Write withConfiguration(Configuration configuration) {
checkArgument(configuration != null, "configuration can not be null");
return new Write(new SerializableConfiguration(configuration), tableId);
}

/**
Writes to the specified table.*/
/**
* A {@link PTransform} that writes to HBase. See the class-level Javadoc on {@link HBaseIO} for*
* more information.
*
* @see HBaseIO
*/
public static class Write extends PTransform<PCollection<Mutation>, PDone> {
/** Writes to the HBase instance indicated by the* given Configuration. */
public Write withConfiguration(Configuration configuration) {
checkArgument(configuration != null, "configuration can not be null");
return new Write(new SerializableConfiguration(configuration), tableId);
}

public Write withTableId(String tableId) {
checkArgument(tableId != null, "tableIdcan not be null");
return new Write(serializableConfiguration, tableId);
}
/** Writes to the specified table. */
public Write withTableId(String tableId) {
checkArgument(tableId != null, "tableIdcan not be null");
return new Write(serializableConfiguration, tableId);
}

private Write(SerializableConfiguration serializableConfiguration, String tableId) {
this.serializableConfiguration = serializableConfiguration;
this.tableId = tableId;
}

@Override
public PDone expand(PCollection<Mutation> input) {
checkArgument(serializableConfiguration != null, "withConfiguration() is required");
checkArgument(tableId != null && !tableId.isEmpty(), "withTableId() is required");
try (Connection connection = ConnectionFactory.createConnection(
serializableConfiguration.get())) {
Admin admin = connection.getAdmin();
checkArgument(admin.tableExists(TableName.valueOf(tableId)),
"Table %s does not exist", tableId);
} catch (IOException e) {
LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
}
input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration)));
return PDone.in(input.getPipeline());
}
@Override
public PDone expand(PCollection<Mutation> input) {
checkArgument(serializableConfiguration != null, "withConfiguration() is required");
checkArgument(tableId != null && !tableId.isEmpty(), "withTableId() is required");
try (Connection connection =
ConnectionFactory.createConnection(serializableConfiguration.get())) {
Admin admin = connection.getAdmin();
checkArgument(
admin.tableExists(TableName.valueOf(tableId)), "Table %s does not exist", tableId);
} catch (IOException e) {
LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
}
input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration)));
return PDone.in(input.getPipeline());
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,14 +355,12 @@ public void testWriting() throws Exception {
public void testWritingFailsTableDoesNotExist() throws Exception {
final String table = "TEST-TABLE-DOES-NOT-EXIST";



// Exception will be thrown by write.expand() when write is applied.
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(String.format("Table %s does not exist", table));
p.apply(Create.empty(HBaseMutationCoder.of()))
.apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
}
// Exception will be thrown by write.expand() when write is applied.
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(String.format("Table %s does not exist", table));
p.apply(Create.empty(HBaseMutationCoder.of()))
.apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
}

/** Tests that when writing an element fails, the write fails. */
@Test
Expand Down

0 comments on commit 81d304d

Please sign in to comment.