Skip to content

Commit

Permalink
[improve][meta] Fix invalid use of drain API and race condition in cl…
Browse files Browse the repository at this point in the history
…osing metadata store (apache#22585)

Co-authored-by: Matteo Merli <mmerli@apache.org>
  • Loading branch information
lhotari and merlimat authored Jun 14, 2024
1 parent c2702e9 commit f7d35e5
Showing 1 changed file with 25 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,13 @@ public void close() throws Exception {
// Fail all the pending items
MetadataStoreException ex =
new MetadataStoreException.AlreadyClosedException("Metadata store is getting closed");
readOps.drain(op -> op.getFuture().completeExceptionally(ex));
writeOps.drain(op -> op.getFuture().completeExceptionally(ex));

MetadataOp op;
while ((op = readOps.poll()) != null) {
op.getFuture().completeExceptionally(ex);
}
while ((op = writeOps.poll()) != null) {
op.getFuture().completeExceptionally(ex);
}
scheduledTask.cancel(true);
}
super.close();
Expand All @@ -98,7 +102,13 @@ public void close() throws Exception {
private void flush() {
while (!readOps.isEmpty()) {
List<MetadataOp> ops = new ArrayList<>();
readOps.drain(ops::add, maxOperations);
for (int i = 0; i < maxOperations; i++) {
MetadataOp op = readOps.poll();
if (op == null) {
break;
}
ops.add(op);
}
internalBatchOperation(ops);
}

Expand Down Expand Up @@ -167,6 +177,11 @@ public void updateMetadataEventSynchronizer(MetadataEventSynchronizer synchroniz
}

private void enqueue(MessagePassingQueue<MetadataOp> queue, MetadataOp op) {
if (isClosed()) {
MetadataStoreException ex = new MetadataStoreException.AlreadyClosedException();
op.getFuture().completeExceptionally(ex);
return;
}
if (enabled) {
if (!queue.offer(op)) {
// Execute individually if we're failing to enqueue
Expand All @@ -182,6 +197,12 @@ private void enqueue(MessagePassingQueue<MetadataOp> queue, MetadataOp op) {
}

private void internalBatchOperation(List<MetadataOp> ops) {
if (isClosed()) {
MetadataStoreException ex =
new MetadataStoreException.AlreadyClosedException();
ops.forEach(op -> op.getFuture().completeExceptionally(ex));
return;
}
long now = System.currentTimeMillis();
for (MetadataOp op : ops) {
this.batchMetadataStoreStats.recordOpWaiting(now - op.created());
Expand Down

0 comments on commit f7d35e5

Please sign in to comment.