This repository has been archived by the owner on Jan 20, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 266
Put all Collector actions in one execute #365
Merged
Merged
Changes from 9 commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
a002ee1
Put all Collector actions in one execute
johnynek e4cdbd7
Put back a wrongly removed option
johnynek 403e1a0
change source to using Timestamp
johnynek e3e86e7
Fix the semigroup
johnynek f4ad6f5
fix bugs
johnynek 5b571ee
use linked queue for futures
johnynek 4938ff1
Improve logging
johnynek 446a7e1
Make FutureChannel use ConcurrentLinkedQueue
johnynek 63b3bbf
remove BoundedQueue
johnynek ca4beba
Address Ryan's comments
johnynek d26e3a0
Fix size bug, add some tests
johnynek e53e2e9
addressed Ian's comments
johnynek 37988e7
Fix a bad use of collector outside execute
johnynek b5b2045
remove localOrShuffleGrouping
johnynek 6531f74
Remove await on collect, which would throw
johnynek File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
117 changes: 117 additions & 0 deletions
117
summingbird-online/src/main/scala/com/twitter/summingbird/online/Channel.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
/* | ||
Copyright 2013 Twitter, Inc. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package com.twitter.summingbird.online | ||
|
||
import com.twitter.util.{ Await, Duration, Future, Try } | ||
|
||
import java.util.Queue | ||
import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, LinkedBlockingQueue, TimeUnit} | ||
import java.util.concurrent.ConcurrentLinkedQueue | ||
import java.util.concurrent.atomic.AtomicInteger | ||
/** | ||
* | ||
* @author Oscar Boykin | ||
*/ | ||
|
||
object Channel { | ||
/** | ||
* By default, don't block on put | ||
*/ | ||
def apply[T]() = linkedNonBlocking[T] | ||
|
||
def arrayBlocking[T](size: Int): Channel[T] = | ||
new Channel[T](new ArrayBlockingQueue(size)) | ||
|
||
def linkedBlocking[T]: Channel[T] = | ||
new Channel[T](new LinkedBlockingQueue()) | ||
|
||
def linkedNonBlocking[T]: Channel[T] = | ||
new Channel[T](new ConcurrentLinkedQueue()) | ||
} | ||
|
||
/** | ||
* Use this class with a thread-safe queue to receive | ||
* results from futures in one thread. | ||
* Storm needs us to touch it's code in one event path (via | ||
* the execute method in bolts) | ||
*/ | ||
class Channel[T] private (queue: Queue[T]) { | ||
|
||
private val count = new AtomicInteger(0) | ||
|
||
def put(item: T): Int = { | ||
queue.add(item) | ||
count.incrementAndGet | ||
} | ||
|
||
/** Returns the size immediately after the put */ | ||
def putAll(items: TraversableOnce[T]): Int = { | ||
val added = items.foldLeft(0) { (cnt, item) => | ||
queue.add(item) | ||
cnt + 1 | ||
} | ||
count.addAndGet(added) | ||
} | ||
|
||
/** | ||
* check if something is ready now | ||
*/ | ||
def poll: Option[T] = Option(queue.poll()) | ||
|
||
/** | ||
* Obviously, this might not be the same by the time you | ||
* call spill | ||
*/ | ||
def size: Int = count.get | ||
|
||
// Do something on all the elements ready: | ||
@annotation.tailrec | ||
final def foreach(fn: T => Unit): Unit = | ||
queue.poll() match { | ||
case null => () | ||
case itt => fn(itt); foreach(fn) | ||
} | ||
|
||
// fold on all the elements ready: | ||
@annotation.tailrec | ||
final def foldLeft[V](init: V)(fn: (V, T) => V): V = { | ||
queue.poll() match { | ||
case null => init | ||
case itt => foldLeft(fn(init, itt))(fn) | ||
} | ||
} | ||
|
||
/** | ||
* Take enough elements to get the queue under the maxLength | ||
*/ | ||
def trimTo(maxLength: Int): Seq[T] = { | ||
require(maxLength >= 0, "maxLength must be >= 0.") | ||
|
||
@annotation.tailrec | ||
def loop(size: Int, acc: List[T] = Nil): List[T] = { | ||
if(size > maxLength) { | ||
queue.poll match { | ||
case null => acc.reverse // someone else cleared us out | ||
case item => | ||
loop(count.decrementAndGet, item::acc) | ||
} | ||
} | ||
else acc.reverse | ||
} | ||
loop(count.get) | ||
} | ||
} |
49 changes: 0 additions & 49 deletions
49
summingbird-online/src/main/scala/com/twitter/summingbird/online/FutureQueue.scala
This file was deleted.
Oops, something went wrong.
77 changes: 77 additions & 0 deletions
77
summingbird-online/src/test/scala/com/twitter/summingbird/online/QueueChannelLaws.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
/* | ||
Copyright 2013 Twitter, Inc. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package com.twitter.summingbird.online | ||
|
||
import org.scalacheck._ | ||
import Gen._ | ||
import Arbitrary._ | ||
import org.scalacheck.Prop._ | ||
|
||
import com.twitter.util.{Return, Throw, Future, Try} | ||
|
||
object QueueChannelLaws extends Properties("Channel") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great work with all the tests, the only one I don't see and am not sure matters is just ordering of put to poll There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will add. |
||
|
||
property("Putting into a BoundedQueue gets size right") = forAll { (items: List[String]) => | ||
val q = Channel[String]() | ||
q.putAll(items) | ||
q.size == items.size | ||
} | ||
property("not spill if capacity is enough") = forAll { (items: List[Int]) => | ||
val q = Channel[Int]() | ||
q.putAll(items) | ||
q.trimTo(items.size).size == 0 | ||
} | ||
property("Work with indepent additions") = forAll { (items: List[Int]) => | ||
val q = Channel[Int]() | ||
items.map(q.put(_)) == (1 to items.size).toList | ||
} | ||
property("spill all with zero capacity") = forAll { (items: List[Int]) => | ||
val q = Channel[Int]() | ||
q.putAll(items) | ||
q.trimTo(0) == items | ||
} | ||
property("Channel works with finished futures") = forAll { (items: List[Int]) => | ||
val q = Channel.linkedBlocking[(Int,Try[Int])] | ||
items.foreach { i => q.put((i, Try(i*i))) } | ||
q.foldLeft((0, true)) { case ((cnt, good), (i, ti)) => | ||
ti match { | ||
case Return(ii) => (cnt + 1, good) | ||
case Throw(e) => (cnt + 1, false) | ||
} | ||
} == (items.size, true) | ||
} | ||
property("Channel.linkedNonBlocking works") = forAll { (items: List[Int]) => | ||
val q = Channel.linkedNonBlocking[(Int,Try[Int])] | ||
items.foreach { i => q.put((i, Try(i*i))) } | ||
q.foldLeft((0, true)) { case ((cnt, good), (i, ti)) => | ||
ti match { | ||
case Return(ii) => (cnt + 1, good) | ||
case Throw(e) => (cnt + 1, false) | ||
} | ||
} == (items.size, true) | ||
} | ||
property("Channel foreach works") = forAll { (items: List[Int]) => | ||
// Make sure we can fit everything | ||
val q = Channel.arrayBlocking[(Int,Try[Int])](items.size + 1) | ||
items.foreach { i => q.put((i,Try(i*i))) } | ||
var works = true | ||
q.foreach { case (i, Return(ii)) => | ||
works = works && (ii == i*i) | ||
} | ||
works | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was your intention for this to block if
queue
is anArrayBlockingQueue
with a fixed size? What actually will happen is that ajava.lang.IllegalStateException: Queue full
exception will be thrown instead of it actually blocking (you would need to callput
instead for the blocking behavior).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks nice!