Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document bug in bulk batch writer and rename the class to indicate its buggy #288

Merged
merged 3 commits into from
Dec 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ static BatchWriterFactory create(AccumuloClient client, ContinuousEnv env,
var workDir = new Path(bulkWorkDir);
var filesystem = workDir.getFileSystem(conf);
var memLimit = Long.parseLong(testProps.getProperty(TestProps.CI_INGEST_BULK_MEM_LIMIT));
return tableName -> new BulkBatchWriter(client, tableName, filesystem, workDir, memLimit,
splitSupplier);
return tableName -> new FlakyBulkBatchWriter(client, tableName, filesystem, workDir,
memLimit, splitSupplier);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,14 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;

public class BulkBatchWriter implements BatchWriter {
/**
* BatchWriter that bulk imports in its implementation. The implementation contains a bug that was
* found to be useful for testing Accumulo. The bug was left and this class was renamed to add Flaky
* to indicate its danger for other uses.
*/
public class FlakyBulkBatchWriter implements BatchWriter {

private static final Logger log = LoggerFactory.getLogger(BulkBatchWriter.class);
private static final Logger log = LoggerFactory.getLogger(FlakyBulkBatchWriter.class);

private final Deque<Mutation> mutations = new ArrayDeque<>();
private final AccumuloClient client;
Expand All @@ -62,7 +67,7 @@ public class BulkBatchWriter implements BatchWriter {
private long memUsed;
private boolean closed = false;

public BulkBatchWriter(AccumuloClient client, String tableName, FileSystem fileSystem,
public FlakyBulkBatchWriter(AccumuloClient client, String tableName, FileSystem fileSystem,
Path workPath, long memLimit, Supplier<SortedSet<Text>> splitSupplier) {
this.client = client;
this.tableName = tableName;
Expand Down Expand Up @@ -119,23 +124,38 @@ public synchronized void flush() throws MutationsRejectedException {
}
}

Comparator<KeyValue> kvComparator = (kv1, kv2) -> kv1.getKey().compareTo(kv2.getKey());
keysValues.sort(kvComparator);
keysValues.sort(Map.Entry.comparingByKey());

RFileWriter writer = null;
byte[] currEndRow = null;
int nextFileNameCounter = 0;

var loadPlanBuilder = LoadPlan.builder();

// This code is broken because Arrays.compare will compare bytes as signed integers. Accumulo
// treats bytes as unsigned 8 bit integers for sorting purposes. This incorrect comparator
// causes this code to sometimes prematurely close rfiles, which can lead to lots of files
// being bulk imported into a single tablet. The files still go to the correct tablet, so this
// does not cause data loss. This bug was found to be useful in testing as it introduces
// stress on bulk import+compactions and it was decided to keep this bug. If copying this code
// elsewhere then this bug should probably be fixed.
Comparator<byte[]> comparator = Arrays::compare;
// To fix the code above it should be replaced with the following
// Comparator<byte[]> comparator = UnsignedBytes.lexicographicalComparator();

for (var keyValue : keysValues) {
var key = keyValue.getKey();
if (writer == null
|| (currEndRow != null && Arrays.compare(key.getRowData().toArray(), currEndRow) > 0)) {
if (writer == null || (currEndRow != null
&& comparator.compare(key.getRowData().toArray(), currEndRow) > 0)) {
if (writer != null) {
writer.close();
}

// When the above code prematurely closes a rfile because of the incorrect comparator, the
// following code will find a new Tablet. Since the following code uses the Text
// comparator its comparisons are correct and it will just find the same tablet for the
// file that was just closed. This is what cause multiple files to added to the same
// tablet.
var row = key.getRow();
var headSet = splits.headSet(row);
var tabletPrevRow = headSet.isEmpty() ? null : headSet.last();
Expand Down
Loading