From cc4217ccb2a68332b48ed743a365b9813d4a06e8 Mon Sep 17 00:00:00 2001 From: Solomon Duskis Date: Mon, 29 Aug 2016 17:05:55 -0400 Subject: [PATCH] Fixing an issue in the HBaseMutationCoder. (#990) PutAdapter is used by HBaseMutation coder and PutAdapter has a bit of a hack to set the timestamp on the client side. That breaks Dataflow's requirement of having a consistent serialization of objects. We've been telling users to set client side timestamps when they hit this obscure problem. This PR fixes the serialization to not add the client side timestamp for the Dataflow serialization, thereby fixing the underlying problem. --- .../dataflow/coders/HBaseMutationCoder.java | 6 ++- .../coders/HBaseMutationCoderTest.java | 46 +++++++++++++++++-- .../bigtable/hbase/adapters/PutAdapter.java | 4 +- 3 files changed, 49 insertions(+), 7 deletions(-) diff --git a/bigtable-dataflow-parent/bigtable-hbase-dataflow/src/main/java/com/google/cloud/bigtable/dataflow/coders/HBaseMutationCoder.java b/bigtable-dataflow-parent/bigtable-hbase-dataflow/src/main/java/com/google/cloud/bigtable/dataflow/coders/HBaseMutationCoder.java index d72c5b3600..2408982d1c 100644 --- a/bigtable-dataflow-parent/bigtable-hbase-dataflow/src/main/java/com/google/cloud/bigtable/dataflow/coders/HBaseMutationCoder.java +++ b/bigtable-dataflow-parent/bigtable-hbase-dataflow/src/main/java/com/google/cloud/bigtable/dataflow/coders/HBaseMutationCoder.java @@ -41,7 +41,11 @@ public class HBaseMutationCoder extends AtomicCoder { private static final long serialVersionUID = -3853654063196018580L; - private static final PutAdapter PUT_ADAPTER = new PutAdapter(Integer.MAX_VALUE); + + // Don't force the time setting in the PutAdapter, since that can lead to inconsistent + // encoding/decoding over time, which can cause Dataflow's MutationDetector to say that the + // encoding is invalid. + final static PutAdapter PUT_ADAPTER = new PutAdapter(Integer.MAX_VALUE, false); @Override public void encode(Mutation mutation, OutputStream outStream, Coder.Context context) diff --git a/bigtable-dataflow-parent/bigtable-hbase-dataflow/src/test/java/com/google/cloud/bigtable/dataflow/coders/HBaseMutationCoderTest.java b/bigtable-dataflow-parent/bigtable-hbase-dataflow/src/test/java/com/google/cloud/bigtable/dataflow/coders/HBaseMutationCoderTest.java index c9b72dd500..46363bd0b2 100644 --- a/bigtable-dataflow-parent/bigtable-hbase-dataflow/src/test/java/com/google/cloud/bigtable/dataflow/coders/HBaseMutationCoderTest.java +++ b/bigtable-dataflow-parent/bigtable-hbase-dataflow/src/test/java/com/google/cloud/bigtable/dataflow/coders/HBaseMutationCoderTest.java @@ -17,11 +17,15 @@ import static org.apache.hadoop.hbase.util.Bytes.toBytes; +import com.google.bigtable.repackaged.com.google.api.client.util.Clock; +import com.google.cloud.dataflow.sdk.util.MutationDetectors; import java.io.IOException; - +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Put; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; /** @@ -29,17 +33,49 @@ */ public class HBaseMutationCoderTest { - private HBaseMutationCoder underTest = new HBaseMutationCoder(); + private HBaseMutationCoder underTest; + private AtomicLong time; + + @Before + public void setup() { + underTest = new HBaseMutationCoder(); + time = new AtomicLong(System.currentTimeMillis()); + + HBaseMutationCoder.PUT_ADAPTER.clock = new Clock(){ + @Override + public long currentTimeMillis() { + return time.get(); + } + }; + } + + @After + public void tearDown() { + HBaseMutationCoder.PUT_ADAPTER.clock = Clock.SYSTEM; + } @Test public void testPut() throws IOException { - Put original = new Put(toBytes("key")).addColumn(toBytes("family"), toBytes("column"), toBytes("value")); - Assert.assertEquals(0, original.compareTo(CoderTestUtil.encodeAndDecode(underTest, original))); + Put original = + new Put(toBytes("key")).addColumn(toBytes("family"), toBytes("column"), toBytes("value")); + for (int i = 0; i < 5; i++) { + Assert.assertEquals( + 0, original.compareTo(CoderTestUtil.encodeAndDecode(underTest, original))); + time.set(time.get() + 10_000); + Assert.assertEquals( + 0, original.compareTo(CoderTestUtil.encodeAndDecode(underTest, original))); + MutationDetectors.forValueWithCoder(original, underTest).verifyUnmodified(); + } } @Test public void testDelete() throws IOException { Delete original = new Delete(toBytes("key")); - Assert.assertEquals(0, original.compareTo(CoderTestUtil.encodeAndDecode(underTest, original))); + for (int i = 0; i < 5; i++) { + Assert.assertEquals( + 0, original.compareTo(CoderTestUtil.encodeAndDecode(underTest, original))); + time.set(time.get() + 10_000); + MutationDetectors.forValueWithCoder(original, underTest).verifyUnmodified(); + } } } diff --git a/bigtable-hbase-parent/bigtable-hbase/src/main/java/com/google/cloud/bigtable/hbase/adapters/PutAdapter.java b/bigtable-hbase-parent/bigtable-hbase/src/main/java/com/google/cloud/bigtable/hbase/adapters/PutAdapter.java index 663b998139..9e763fa03f 100644 --- a/bigtable-hbase-parent/bigtable-hbase/src/main/java/com/google/cloud/bigtable/hbase/adapters/PutAdapter.java +++ b/bigtable-hbase-parent/bigtable-hbase/src/main/java/com/google/cloud/bigtable/hbase/adapters/PutAdapter.java @@ -15,6 +15,7 @@ */ package com.google.cloud.bigtable.hbase.adapters; +import com.google.api.client.util.Clock; import com.google.bigtable.v2.MutateRowRequest; import com.google.bigtable.v2.Mutation; import com.google.bigtable.v2.Mutation.MutationCase; @@ -42,6 +43,7 @@ public class PutAdapter implements OperationAdapter { private final int maxKeyValueSize; private final boolean setClientTimestamp; + public Clock clock = Clock.SYSTEM; /** *

Constructor for PutAdapter.

@@ -74,7 +76,7 @@ public MutateRowRequest.Builder adapt(Put operation) { // Bigtable uses a 1ms granularity. Use this timestamp if the Put does not have one specified to // make mutations idempotent. - long currentTimestampMicros = setClientTimestamp ? System.currentTimeMillis() * 1000 : -1; + long currentTimestampMicros = setClientTimestamp ? clock.currentTimeMillis() * 1000 : -1; final int rowLength = operation.getRow().length; List mutations = new ArrayList<>();