Skip to content

Commit

Permalink
[Fix Python Bigtable dataloss bug] Stop unsetting timestamps of -1 (#…
Browse files Browse the repository at this point in the history
…28624)

* add test for unset timestamp

* pass thru all timestamps

* default to -1 in schematransform
  • Loading branch information
ahmedabu98 committed Sep 23, 2023
1 parent f635ade commit 68cf802
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,13 @@ public KV<ByteString, Iterable<Mutation>> apply(Row row) {
.setColumnQualifier(
ByteString.copyFrom(ofNullable(mutation.get("column_qualifier")).get()))
.setFamilyNameBytes(
ByteString.copyFrom(ofNullable(mutation.get("family_name")).get()));
if (mutation.containsKey("timestamp_micros")) {
setMutation =
setMutation.setTimestampMicros(
Longs.fromByteArray(ofNullable(mutation.get("timestamp_micros")).get()));
}
ByteString.copyFrom(ofNullable(mutation.get("family_name")).get()))
// Use timestamp if provided, else default to -1 (current Bigtable server time)
.setTimestampMicros(
mutation.containsKey("timestamp_micros")
? Longs.fromByteArray(
ofNullable(mutation.get("timestamp_micros")).get())
: -1);
bigtableMutation = Mutation.newBuilder().setSetCell(setMutation.build()).build();
break;
case "DeleteFromColumn":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ public void tearDown() {
public void testSetMutationsExistingColumn() {
RowMutation rowMutation =
RowMutation.create(tableId, "key-1")
.setCell(COLUMN_FAMILY_NAME_1, "col_a", "val-1-a")
.setCell(COLUMN_FAMILY_NAME_2, "col_c", "val-1-c");
.setCell(COLUMN_FAMILY_NAME_1, "col_a", 1000, "val-1-a")
.setCell(COLUMN_FAMILY_NAME_2, "col_c", 1000, "val-1-c");
dataClient.mutateRow(rowMutation);

List<Map<String, byte[]>> mutations = new ArrayList<>();
Expand All @@ -165,13 +165,15 @@ public void testSetMutationsExistingColumn() {
"type", "SetCell".getBytes(StandardCharsets.UTF_8),
"value", "new-val-1-a".getBytes(StandardCharsets.UTF_8),
"column_qualifier", "col_a".getBytes(StandardCharsets.UTF_8),
"family_name", COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8)));
"family_name", COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8),
"timestamp_micros", Longs.toByteArray(2000)));
mutations.add(
ImmutableMap.of(
"type", "SetCell".getBytes(StandardCharsets.UTF_8),
"value", "new-val-1-c".getBytes(StandardCharsets.UTF_8),
"column_qualifier", "col_c".getBytes(StandardCharsets.UTF_8),
"family_name", COLUMN_FAMILY_NAME_2.getBytes(StandardCharsets.UTF_8)));
"family_name", COLUMN_FAMILY_NAME_2.getBytes(StandardCharsets.UTF_8),
"timestamp_micros", Longs.toByteArray(2000)));
Row mutationRow =
Row.withSchema(SCHEMA)
.withFieldValue("key", "key-1".getBytes(StandardCharsets.UTF_8))
Expand Down Expand Up @@ -202,10 +204,11 @@ public void testSetMutationsExistingColumn() {
.collect(Collectors.toList());
assertEquals(2, cellsColA.size());
assertEquals(2, cellsColC.size());
System.out.println(cellsColA);
System.out.println(cellsColC);
assertEquals("new-val-1-a", cellsColA.get(1).getValue().toStringUtf8());
assertEquals("new-val-1-c", cellsColC.get(1).getValue().toStringUtf8());
// Bigtable keeps cell history ordered by descending timestamp
assertEquals("new-val-1-a", cellsColA.get(0).getValue().toStringUtf8());
assertEquals("new-val-1-c", cellsColC.get(0).getValue().toStringUtf8());
assertEquals("val-1-a", cellsColA.get(1).getValue().toStringUtf8());
assertEquals("val-1-c", cellsColC.get(1).getValue().toStringUtf8());
}

@Test
Expand Down
7 changes: 3 additions & 4 deletions sdks/python/apache_beam/io/gcp/bigtableio.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,10 @@ def process(self, direct_row):
"type": b'SetCell',
"family_name": mutation.set_cell.family_name.encode('utf-8'),
"column_qualifier": mutation.set_cell.column_qualifier,
"value": mutation.set_cell.value
"value": mutation.set_cell.value,
"timestamp_micros": struct.pack(
'>q', mutation.set_cell.timestamp_micros)
}
micros = mutation.set_cell.timestamp_micros
if micros > -1:
mutation_dict['timestamp_micros'] = struct.pack('>q', micros)
elif mutation.__contains__("delete_from_column"):
mutation_dict = {
"type": b'DeleteFromColumn',
Expand Down
18 changes: 18 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ def test_set_mutation(self):
row1_col2_cell = Cell(b'val1-2', 200_000_000)
row2_col1_cell = Cell(b'val2-1', 100_000_000)
row2_col2_cell = Cell(b'val2-2', 200_000_000)
# When setting this cell, we won't set a timestamp. We expect the timestamp
# to default to -1, and Bigtable will set it to system time at insertion.
row2_col1_no_timestamp = Cell(b'val2-2-notimestamp', time.time())
# rows sent to write transform
row1.set_cell(
'col_fam', b'col-1', row1_col1_cell.value, row1_col1_cell.timestamp)
Expand All @@ -232,6 +235,8 @@ def test_set_mutation(self):
'col_fam', b'col-1', row2_col1_cell.value, row2_col1_cell.timestamp)
row2.set_cell(
'col_fam', b'col-2', row2_col2_cell.value, row2_col2_cell.timestamp)
# don't set a timestamp here. it should default to -1
row2.set_cell('col_fam', b'col-no-timestamp', row2_col1_no_timestamp.value)

self.run_pipeline([row1, row2])

Expand All @@ -249,6 +254,19 @@ def test_set_mutation(self):
self.assertEqual(
row2_col2_cell, actual_row2.find_cells('col_fam', b'col-2')[0])

# check mutation that doesn't have a timestamp set is handled properly:
self.assertEqual(
row2_col1_no_timestamp.value,
actual_row2.find_cells('col_fam', b'col-no-timestamp')[0].value)
# Bigtable sets timestamp as insertion time, which is later than the
# time.time() we set when creating this test case
cell_timestamp = actual_row2.find_cells('col_fam',
b'col-no-timestamp')[0].timestamp
self.assertTrue(
row2_col1_no_timestamp.timestamp < cell_timestamp,
msg="Expected cell with unset timestamp to have ingestion time "
f"attached, but was {cell_timestamp}")

def test_delete_cells_mutation(self):
col_fam = self.table.column_family('col_fam')
col_fam.create()
Expand Down

0 comments on commit 68cf802

Please sign in to comment.