Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Put all Collector actions in one execute #365

Merged
merged 15 commits into from
Nov 18, 2013
Merged

Put all Collector actions in one execute #365

merged 15 commits into from
Nov 18, 2013

Conversation

johnynek
Copy link
Collaborator

No description provided.

@johnynek
Copy link
Collaborator Author

@singhala @jnievelt guys

Review help would be great here.

This is a non-trivial change, and hopefully BIG improvement in storm correctness and error logging

private val count = new AtomicInteger(0)

def put(item: T): Int = {
queue.add(item)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was your intention for this to block if queue is an ArrayBlockingQueue with a fixed size? What actually will happen is that a java.lang.IllegalStateException: Queue full exception will be thrown instead of it actually blocking (you would need to call put instead for the blocking behavior).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. Fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks nice!

// always increment after this
protected def add(t: T): Unit
// always decrement after this
protected def pollOrNull: T
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If its an always, why not bake the behavior into these rather than at touch points?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we supply a more scala idiomatic one, return Option[T] and internally do the decrement if not null?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to make it easy to get that right, but if we require every subclass to call decrement, some will get it wrong. By contrast, if we do it below, we can audit it and test it once. This method is just for the adapter to existing java classes.

Option I'll do. That was we being OCD about allocations.... bad move.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So with the the atomic thing, i was thinking it might be more have the downstream api but then final functions which call them and do the decrement/increment. Such that there is only 2 places we need to ensure correctness. Not a big thing, its only a tiny class anyway.

@ianoc
Copy link
Collaborator

ianoc commented Nov 17, 2013

Looks great, nice work. All comments are really very minor

@ianoc
Copy link
Collaborator

ianoc commented Nov 18, 2013

This is looking good to me, if no one has any other comments by tomorrow morning i'm pretty happy to merge it

ianoc added a commit that referenced this pull request Nov 18, 2013
Put all Collector actions in one execute
@ianoc ianoc merged commit d2b4594 into develop Nov 18, 2013
@ianoc ianoc deleted the future-channel branch November 18, 2013 18:29
snoble pushed a commit to snoble/summingbird that referenced this pull request Sep 8, 2017
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants