From ed97a2f81e617428158e24552ad3a96e0460a875 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Tue, 10 Dec 2024 15:32:30 -0500 Subject: [PATCH] fixes sorting in bulk batch writer (#289) BulkBatchWriter was sorting only on the mutation row. Occasionally continuous ingest would generate the same 64 bit id twice in the same batch and if its columns were not sorted properly then the write to the rfile would fail. This fixes the problem by sorting on the entire key instead of only the row. --- .../testing/continuous/BulkBatchWriter.java | 49 ++++++++++++------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java b/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java index 6e763b31..b5b68f98 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java @@ -18,8 +18,11 @@ */ package org.apache.accumulo.testing.continuous; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; +import java.util.Deque; import java.util.List; import java.util.Map; import java.util.SortedSet; @@ -32,6 +35,7 @@ import org.apache.accumulo.core.client.rfile.RFile; import org.apache.accumulo.core.client.rfile.RFileWriter; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyValue; import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.data.Mutation; import org.apache.hadoop.fs.FileSystem; @@ -47,7 +51,7 @@ public class BulkBatchWriter implements BatchWriter { private static final Logger log = LoggerFactory.getLogger(BulkBatchWriter.class); - private final List mutations = new ArrayList<>(); + private final Deque mutations = new ArrayDeque<>(); private final AccumuloClient client; private final String tableName; private final FileSystem fileSystem; @@ -72,7 +76,7 @@ public BulkBatchWriter(AccumuloClient client, String tableName, FileSystem fileS public synchronized void addMutation(Mutation mutation) throws MutationsRejectedException { Preconditions.checkState(!closed); mutation = new Mutation(mutation); - mutations.add(mutation); + mutations.addLast(mutation); memUsed += mutation.estimatedMemoryUsed(); if (memUsed > memLimit) { flush(); @@ -96,7 +100,27 @@ public synchronized void flush() throws MutationsRejectedException { Path tmpDir = new Path(workPath, UUID.randomUUID().toString()); fileSystem.mkdirs(tmpDir); - mutations.sort((m1, m2) -> Arrays.compare(m1.getRow(), m2.getRow())); + + List keysValues = new ArrayList<>(mutations.size()); + + // remove mutations from the dequeue as we convert them to Keys making the Mutation objects + // available for garbage collection + Mutation mutation; + while ((mutation = mutations.pollFirst()) != null) { + for (var columnUpdate : mutation.getUpdates()) { + var builder = Key.builder(false).row(mutation.getRow()) + .family(columnUpdate.getColumnFamily()).qualifier(columnUpdate.getColumnQualifier()) + .visibility(columnUpdate.getColumnVisibility()); + if (columnUpdate.hasTimestamp()) { + builder = builder.timestamp(columnUpdate.getTimestamp()); + } + Key key = builder.deleted(columnUpdate.isDeleted()).build(); + keysValues.add(new KeyValue(key, columnUpdate.getValue())); + } + } + + Comparator kvComparator = (kv1, kv2) -> kv1.getKey().compareTo(kv2.getKey()); + keysValues.sort(kvComparator); RFileWriter writer = null; byte[] currEndRow = null; @@ -104,14 +128,15 @@ public synchronized void flush() throws MutationsRejectedException { var loadPlanBuilder = LoadPlan.builder(); - for (var mutation : mutations) { + for (var keyValue : keysValues) { + var key = keyValue.getKey(); if (writer == null - || (currEndRow != null && Arrays.compare(mutation.getRow(), currEndRow) > 0)) { + || (currEndRow != null && Arrays.compare(key.getRowData().toArray(), currEndRow) > 0)) { if (writer != null) { writer.close(); } - var row = new Text(mutation.getRow()); + var row = key.getRow(); var headSet = splits.headSet(row); var tabletPrevRow = headSet.isEmpty() ? null : headSet.last(); var tailSet = splits.tailSet(row); @@ -126,17 +151,7 @@ public synchronized void flush() throws MutationsRejectedException { log.debug("Created new file {} for range {} {}", filename, tabletPrevRow, tabletEndRow); } - for (var colUpdate : mutation.getUpdates()) { - var key = new Key(mutation.getRow(), colUpdate.getColumnFamily(), - colUpdate.getColumnQualifier(), colUpdate.getColumnVisibility()); - if (colUpdate.hasTimestamp()) { - key.setTimestamp(colUpdate.getTimestamp()); - } - if (colUpdate.isDeleted()) { - key.setDeleted(true); - } - writer.append(key, colUpdate.getValue()); - } + writer.append(key, keyValue.getValue()); } if (writer != null) {