Skip to content

Commit

Permalink
Fixing an issue in the HBaseMutationCoder. (#990)
Browse files Browse the repository at this point in the history
PutAdapter is used by HBaseMutation coder and PutAdapter has a bit of a hack to set the timestamp on the client side.  That breaks Dataflow's requirement of having a consistent serialization of objects.  We've been telling users to set client side timestamps when they hit this obscure problem.  This PR fixes the serialization to not add the client side timestamp for the Dataflow serialization, thereby fixing the underlying problem.
  • Loading branch information
sduskis committed Aug 29, 2016
1 parent 89eb289 commit cc4217c
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@
public class HBaseMutationCoder extends AtomicCoder<Mutation> {

private static final long serialVersionUID = -3853654063196018580L;
private static final PutAdapter PUT_ADAPTER = new PutAdapter(Integer.MAX_VALUE);

// Don't force the time setting in the PutAdapter, since that can lead to inconsistent
// encoding/decoding over time, which can cause Dataflow's MutationDetector to say that the
// encoding is invalid.
final static PutAdapter PUT_ADAPTER = new PutAdapter(Integer.MAX_VALUE, false);

@Override
public void encode(Mutation mutation, OutputStream outStream, Coder.Context context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,65 @@

import static org.apache.hadoop.hbase.util.Bytes.toBytes;

import com.google.bigtable.repackaged.com.google.api.client.util.Clock;
import com.google.cloud.dataflow.sdk.util.MutationDetectors;
import java.io.IOException;

import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/**
* Tests for {@link HBaseMutationCoder}.
*/
public class HBaseMutationCoderTest {

private HBaseMutationCoder underTest = new HBaseMutationCoder();
private HBaseMutationCoder underTest;
private AtomicLong time;

@Before
public void setup() {
underTest = new HBaseMutationCoder();
time = new AtomicLong(System.currentTimeMillis());

HBaseMutationCoder.PUT_ADAPTER.clock = new Clock(){
@Override
public long currentTimeMillis() {
return time.get();
}
};
}

@After
public void tearDown() {
HBaseMutationCoder.PUT_ADAPTER.clock = Clock.SYSTEM;
}

@Test
public void testPut() throws IOException {
Put original = new Put(toBytes("key")).addColumn(toBytes("family"), toBytes("column"), toBytes("value"));
Assert.assertEquals(0, original.compareTo(CoderTestUtil.encodeAndDecode(underTest, original)));
Put original =
new Put(toBytes("key")).addColumn(toBytes("family"), toBytes("column"), toBytes("value"));
for (int i = 0; i < 5; i++) {
Assert.assertEquals(
0, original.compareTo(CoderTestUtil.encodeAndDecode(underTest, original)));
time.set(time.get() + 10_000);
Assert.assertEquals(
0, original.compareTo(CoderTestUtil.encodeAndDecode(underTest, original)));
MutationDetectors.forValueWithCoder(original, underTest).verifyUnmodified();
}
}

@Test
public void testDelete() throws IOException {
Delete original = new Delete(toBytes("key"));
Assert.assertEquals(0, original.compareTo(CoderTestUtil.encodeAndDecode(underTest, original)));
for (int i = 0; i < 5; i++) {
Assert.assertEquals(
0, original.compareTo(CoderTestUtil.encodeAndDecode(underTest, original)));
time.set(time.get() + 10_000);
MutationDetectors.forValueWithCoder(original, underTest).verifyUnmodified();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigtable.hbase.adapters;

import com.google.api.client.util.Clock;
import com.google.bigtable.v2.MutateRowRequest;
import com.google.bigtable.v2.Mutation;
import com.google.bigtable.v2.Mutation.MutationCase;
Expand Down Expand Up @@ -42,6 +43,7 @@
public class PutAdapter implements OperationAdapter<Put, MutateRowRequest.Builder> {
private final int maxKeyValueSize;
private final boolean setClientTimestamp;
public Clock clock = Clock.SYSTEM;

/**
* <p>Constructor for PutAdapter.</p>
Expand Down Expand Up @@ -74,7 +76,7 @@ public MutateRowRequest.Builder adapt(Put operation) {

// Bigtable uses a 1ms granularity. Use this timestamp if the Put does not have one specified to
// make mutations idempotent.
long currentTimestampMicros = setClientTimestamp ? System.currentTimeMillis() * 1000 : -1;
long currentTimestampMicros = setClientTimestamp ? clock.currentTimeMillis() * 1000 : -1;
final int rowLength = operation.getRow().length;

List<Mutation> mutations = new ArrayList<>();
Expand Down

0 comments on commit cc4217c

Please sign in to comment.