Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIP-379] DrainingHashEntry.refCount will increase when hashes move between existing consumers #23421

Closed
lhotari opened this issue Oct 8, 2024 · 2 comments
Labels
release/blocker Indicate the PR or issue that should block the release until it gets resolved

Comments

@lhotari
Copy link
Member

lhotari commented Oct 8, 2024

Background

  • drainingHashesTracker.addEntry(c, stickyKeyHash) will increase the variable DrainingHashEntry.refCount.

Issue: the variable named DrainingHashEntry.refCount will be increased repeatedly at the following scenario:

  • Register C1, assign k1 to C1
  • Register C2, move k1 to C2
    • DrainingHashEntry.refCount increases
  • Register C3, move k1 to C3
    • DrainingHashEntry.refCount increases

Originally posted by @poorbarcode in #23352 (comment)

@lhotari
Copy link
Member Author

lhotari commented Oct 10, 2024

  • Register C1, assign k1 to C1

  • Register C2, move k1 to C2

    • DrainingHashEntry.refCount increases
  • Register C3, move k1 to C3

    • DrainingHashEntry.refCount increases

This is already covered. While the hash is assigned to another consumer, the "draining hash" entry can only be assigned to a single consumer at a time. The reference count won't get increased for other consumers.

several locations:

} else if (entry.getConsumer() != consumer) {
throw new IllegalStateException(
"Consumer " + entry.getConsumer() + " is already draining hash " + stickyHash
+ " in dispatcher " + dispatcherName + ". Same hash being used for consumer " + consumer
+ ".");

DrainingHashesTracker.DrainingHashEntry drainingHashEntry = drainingHashesTracker.getEntry(stickyKeyHash);
if (drainingHashEntry != null && drainingHashEntry.getConsumer() != consumer) {
log.warn("[{}] Another consumer id {} is already draining hash {}. Skipping adding {}:{} to pending acks "
+ "for consumer {}. Adding the message to replay.",
getName(), drainingHashEntry.getConsumer(), stickyKeyHash, ledgerId, entryId, consumer);
addMessageToReplay(ledgerId, entryId, stickyKeyHash);
// block message from sending
return false;
}

This is for the reassignment case:

// hash has been reassigned to the original consumer, remove the entry
// and don't block the hash
if (entry.getConsumer() == consumer) {
log.info("[{}] Hash {} has been reassigned consumer {}. "
+ "The draining hash entry with refCount={} will be removed.",
dispatcherName, stickyKeyHash, entry.getConsumer(), entry.refCount);
drainingHashes.remove(stickyKeyHash, entry);
return false;
}

@lhotari
Copy link
Member Author

lhotari commented Oct 11, 2024

Closing this as a non-issue. /cc @equanz

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release/blocker Indicate the PR or issue that should block the release until it gets resolved
Development

No branches or pull requests

1 participant