diff --git a/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java b/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java index 6e763b31..b5b68f98 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java @@ -18,8 +18,11 @@ */ package org.apache.accumulo.testing.continuous; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; +import java.util.Deque; import java.util.List; import java.util.Map; import java.util.SortedSet; @@ -32,6 +35,7 @@ import org.apache.accumulo.core.client.rfile.RFile; import org.apache.accumulo.core.client.rfile.RFileWriter; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyValue; import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.data.Mutation; import org.apache.hadoop.fs.FileSystem; @@ -47,7 +51,7 @@ public class BulkBatchWriter implements BatchWriter { private static final Logger log = LoggerFactory.getLogger(BulkBatchWriter.class); - private final List mutations = new ArrayList<>(); + private final Deque mutations = new ArrayDeque<>(); private final AccumuloClient client; private final String tableName; private final FileSystem fileSystem; @@ -72,7 +76,7 @@ public BulkBatchWriter(AccumuloClient client, String tableName, FileSystem fileS public synchronized void addMutation(Mutation mutation) throws MutationsRejectedException { Preconditions.checkState(!closed); mutation = new Mutation(mutation); - mutations.add(mutation); + mutations.addLast(mutation); memUsed += mutation.estimatedMemoryUsed(); if (memUsed > memLimit) { flush(); @@ -96,7 +100,27 @@ public synchronized void flush() throws MutationsRejectedException { Path tmpDir = new Path(workPath, UUID.randomUUID().toString()); fileSystem.mkdirs(tmpDir); - mutations.sort((m1, m2) -> Arrays.compare(m1.getRow(), m2.getRow())); + + List keysValues = new ArrayList<>(mutations.size()); + + // remove mutations from the dequeue as we convert them to Keys making the Mutation objects + // available for garbage collection + Mutation mutation; + while ((mutation = mutations.pollFirst()) != null) { + for (var columnUpdate : mutation.getUpdates()) { + var builder = Key.builder(false).row(mutation.getRow()) + .family(columnUpdate.getColumnFamily()).qualifier(columnUpdate.getColumnQualifier()) + .visibility(columnUpdate.getColumnVisibility()); + if (columnUpdate.hasTimestamp()) { + builder = builder.timestamp(columnUpdate.getTimestamp()); + } + Key key = builder.deleted(columnUpdate.isDeleted()).build(); + keysValues.add(new KeyValue(key, columnUpdate.getValue())); + } + } + + Comparator kvComparator = (kv1, kv2) -> kv1.getKey().compareTo(kv2.getKey()); + keysValues.sort(kvComparator); RFileWriter writer = null; byte[] currEndRow = null; @@ -104,14 +128,15 @@ public synchronized void flush() throws MutationsRejectedException { var loadPlanBuilder = LoadPlan.builder(); - for (var mutation : mutations) { + for (var keyValue : keysValues) { + var key = keyValue.getKey(); if (writer == null - || (currEndRow != null && Arrays.compare(mutation.getRow(), currEndRow) > 0)) { + || (currEndRow != null && Arrays.compare(key.getRowData().toArray(), currEndRow) > 0)) { if (writer != null) { writer.close(); } - var row = new Text(mutation.getRow()); + var row = key.getRow(); var headSet = splits.headSet(row); var tabletPrevRow = headSet.isEmpty() ? null : headSet.last(); var tailSet = splits.tailSet(row); @@ -126,17 +151,7 @@ public synchronized void flush() throws MutationsRejectedException { log.debug("Created new file {} for range {} {}", filename, tabletPrevRow, tabletEndRow); } - for (var colUpdate : mutation.getUpdates()) { - var key = new Key(mutation.getRow(), colUpdate.getColumnFamily(), - colUpdate.getColumnQualifier(), colUpdate.getColumnVisibility()); - if (colUpdate.hasTimestamp()) { - key.setTimestamp(colUpdate.getTimestamp()); - } - if (colUpdate.isDeleted()) { - key.setDeleted(true); - } - writer.append(key, colUpdate.getValue()); - } + writer.append(key, keyValue.getValue()); } if (writer != null) {