diff --git a/bigtable-dataflow-parent/bigtable-hbase-dataflow/src/main/java/com/google/cloud/bigtable/dataflow/coders/HBaseMutationCoder.java b/bigtable-dataflow-parent/bigtable-hbase-dataflow/src/main/java/com/google/cloud/bigtable/dataflow/coders/HBaseMutationCoder.java index d72c5b3600..2408982d1c 100644 --- a/bigtable-dataflow-parent/bigtable-hbase-dataflow/src/main/java/com/google/cloud/bigtable/dataflow/coders/HBaseMutationCoder.java +++ b/bigtable-dataflow-parent/bigtable-hbase-dataflow/src/main/java/com/google/cloud/bigtable/dataflow/coders/HBaseMutationCoder.java @@ -41,7 +41,11 @@ public class HBaseMutationCoder extends AtomicCoder { private static final long serialVersionUID = -3853654063196018580L; - private static final PutAdapter PUT_ADAPTER = new PutAdapter(Integer.MAX_VALUE); + + // Don't force the time setting in the PutAdapter, since that can lead to inconsistent + // encoding/decoding over time, which can cause Dataflow's MutationDetector to say that the + // encoding is invalid. + final static PutAdapter PUT_ADAPTER = new PutAdapter(Integer.MAX_VALUE, false); @Override public void encode(Mutation mutation, OutputStream outStream, Coder.Context context) diff --git a/bigtable-dataflow-parent/bigtable-hbase-dataflow/src/test/java/com/google/cloud/bigtable/dataflow/coders/HBaseMutationCoderTest.java b/bigtable-dataflow-parent/bigtable-hbase-dataflow/src/test/java/com/google/cloud/bigtable/dataflow/coders/HBaseMutationCoderTest.java index c9b72dd500..46363bd0b2 100644 --- a/bigtable-dataflow-parent/bigtable-hbase-dataflow/src/test/java/com/google/cloud/bigtable/dataflow/coders/HBaseMutationCoderTest.java +++ b/bigtable-dataflow-parent/bigtable-hbase-dataflow/src/test/java/com/google/cloud/bigtable/dataflow/coders/HBaseMutationCoderTest.java @@ -17,11 +17,15 @@ import static org.apache.hadoop.hbase.util.Bytes.toBytes; +import com.google.bigtable.repackaged.com.google.api.client.util.Clock; +import com.google.cloud.dataflow.sdk.util.MutationDetectors; import java.io.IOException; - +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Put; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; /** @@ -29,17 +33,49 @@ */ public class HBaseMutationCoderTest { - private HBaseMutationCoder underTest = new HBaseMutationCoder(); + private HBaseMutationCoder underTest; + private AtomicLong time; + + @Before + public void setup() { + underTest = new HBaseMutationCoder(); + time = new AtomicLong(System.currentTimeMillis()); + + HBaseMutationCoder.PUT_ADAPTER.clock = new Clock(){ + @Override + public long currentTimeMillis() { + return time.get(); + } + }; + } + + @After + public void tearDown() { + HBaseMutationCoder.PUT_ADAPTER.clock = Clock.SYSTEM; + } @Test public void testPut() throws IOException { - Put original = new Put(toBytes("key")).addColumn(toBytes("family"), toBytes("column"), toBytes("value")); - Assert.assertEquals(0, original.compareTo(CoderTestUtil.encodeAndDecode(underTest, original))); + Put original = + new Put(toBytes("key")).addColumn(toBytes("family"), toBytes("column"), toBytes("value")); + for (int i = 0; i < 5; i++) { + Assert.assertEquals( + 0, original.compareTo(CoderTestUtil.encodeAndDecode(underTest, original))); + time.set(time.get() + 10_000); + Assert.assertEquals( + 0, original.compareTo(CoderTestUtil.encodeAndDecode(underTest, original))); + MutationDetectors.forValueWithCoder(original, underTest).verifyUnmodified(); + } } @Test public void testDelete() throws IOException { Delete original = new Delete(toBytes("key")); - Assert.assertEquals(0, original.compareTo(CoderTestUtil.encodeAndDecode(underTest, original))); + for (int i = 0; i < 5; i++) { + Assert.assertEquals( + 0, original.compareTo(CoderTestUtil.encodeAndDecode(underTest, original))); + time.set(time.get() + 10_000); + MutationDetectors.forValueWithCoder(original, underTest).verifyUnmodified(); + } } } diff --git a/bigtable-hbase-parent/bigtable-hbase/src/main/java/com/google/cloud/bigtable/hbase/adapters/PutAdapter.java b/bigtable-hbase-parent/bigtable-hbase/src/main/java/com/google/cloud/bigtable/hbase/adapters/PutAdapter.java index 663b998139..9e763fa03f 100644 --- a/bigtable-hbase-parent/bigtable-hbase/src/main/java/com/google/cloud/bigtable/hbase/adapters/PutAdapter.java +++ b/bigtable-hbase-parent/bigtable-hbase/src/main/java/com/google/cloud/bigtable/hbase/adapters/PutAdapter.java @@ -15,6 +15,7 @@ */ package com.google.cloud.bigtable.hbase.adapters; +import com.google.api.client.util.Clock; import com.google.bigtable.v2.MutateRowRequest; import com.google.bigtable.v2.Mutation; import com.google.bigtable.v2.Mutation.MutationCase; @@ -42,6 +43,7 @@ public class PutAdapter implements OperationAdapter { private final int maxKeyValueSize; private final boolean setClientTimestamp; + public Clock clock = Clock.SYSTEM; /** *

Constructor for PutAdapter.

@@ -74,7 +76,7 @@ public MutateRowRequest.Builder adapt(Put operation) { // Bigtable uses a 1ms granularity. Use this timestamp if the Put does not have one specified to // make mutations idempotent. - long currentTimestampMicros = setClientTimestamp ? System.currentTimeMillis() * 1000 : -1; + long currentTimestampMicros = setClientTimestamp ? clock.currentTimeMillis() * 1000 : -1; final int rowLength = operation.getRow().length; List mutations = new ArrayList<>();