-
Notifications
You must be signed in to change notification settings - Fork 603
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix unintended unchunking by adding override map #3189
Conversation
Thanks! Can you run |
@@ -1024,6 +1024,11 @@ class StreamSuite extends Fs2Suite { | |||
assert(compileErrors("Stream.eval(IO(1)).through(p)").nonEmpty) | |||
} | |||
|
|||
test("Unintended unchunking when using Monad instance of Stream") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you have only tested StreamLowPriority instance. I think that duplicating this test and adding .covary[IO]
before .through
(and .compile
before .toList
) will be sufficient :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I will do that.
Hi, I run prePR and add the new test. :) |
Re-triggering CI |
test("Unintended unchunking when using StreamLowPriority#monadInstance") { | ||
def generateTuple(pipe: Pipe[fs2.Pure, Int, (Int, Int)]) = | ||
(Stream(1, 2, 3) ++ Stream(4, 5, 6)).through(pipe).chunks.toList | ||
assertEquals(generateTuple(_.map(e => e -> (e + 1))), generateTuple(_.fproduct(_ + 1))) | ||
} | ||
|
||
test("Unintended unchunking when using Stream#monadErrorInstance") { | ||
def generateTuple(pipe: Pipe[IO, Int, (Int, Int)]) = | ||
(Stream(1, 2, 3) ++ Stream(4, 5, 6)).covary[IO].through(pipe).chunks.compile.toList | ||
val lhs = generateTuple(_.map(e => e -> (e + 1))) | ||
val rhs = generateTuple(_.fproduct(_ + 1)) | ||
(lhs, rhs).mapN(assertEquals(_, _)) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests seem to be needlessly complicated, with the use of different literal values, the tuple, the increment, etc.
In a "test-driven-development" fashion, the process is to 1. write the smallest possible tests that fails 2. change code, 3. check that test passes. It seems to me that would be achieved, for the problem, with a test core that did the following:
def countChunks(str: Stream[Nothing, _]): Int =
str.chunks.toList.length
val source = Stream(0) ++ Stream(0, 0)
val mapped = source.map(_ => 1)
assertEquals(countChunks(mapped), 2)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your review!
It's helpful, and I will make a change for this:)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, sorry if I made some mistakes.
I run a local test and found that only calling the 'map' method like source.map(_ => 1)
can't call the method I changed. :)
I changed my code to below to make it simpler, is it OK?
test("Unintended unchunking when using StreamLowPriority#monadInstance") {
def countChunks(): Int =
(Stream(0) ++ Stream(0, 0)).fproduct(_ + 1).chunks.toList.length
assertEquals(countChunks(), 2)
}
test("Unintended unchunking when using Stream#monadErrorInstance") {
def countChunks() =
(Stream(0) ++ Stream(0, 0)).covary[IO].fproduct(_ + 1).chunks.compile.toList.map(_.length)
countChunks().assertEquals(2)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great point, if we add methods directly on stream in the future it may break these tests.
Instead I recommend doing like this:
Monad[Stream[F, *]].map(source)(_ => 1)
MonadError[Stream[F, *]].map(source)(_ => 1)
That way we can be sure we are testing exactly the right thing :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we can do even better.
Stream.monadInstance.map(source)(_ => 1)
// etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, but I encountered a problem. :(
Stream.monadInstance.map(source)
works well, but for Stream.monadErrorInstance.map(str)
, my IDE shows 'No implicits found for parameter ev: ApplicativeError[F_ , Throwable]', what's that? 😂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for trying that! If you make the change I suggested in #3189 (comment), then we only need one test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, but can it cover the 'MonadErrorInstance'? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, because my proposal is that Monad
and MonadError
implementations should share the same code. Then we don't need to write the same code twice, fix the same bugs twice, and add the same tests twice :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds good.
I will look at the repo and make them share the same code.
@@ -5222,6 +5222,7 @@ object Stream extends StreamLowPriority { | |||
def handleErrorWith[A](s: Stream[F, A])(h: Throwable => Stream[F, A]) = | |||
s.handleErrorWith(h) | |||
def raiseError[A](t: Throwable) = Stream.raiseError[F](t) | |||
override def map[A, B](fa: Stream[F, A])(f: A => B): Stream[F, B] = fa.map(f) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unfortunate that this fix has to be duplicated. Perhaps we should extract a:
class StreamMonad[F] extends Monad[Stream[F, *]] {
...
}
Then the code can be shared.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now my understanding is:🤔
After extracting a class like
class StreamMonad[F] extends Monad[Stream[F, *]] {
...
}
the original code
private[fs2] trait StreamLowPriority {
implicit def monadInstance[F[_]]: Monad[Stream[F, *]] =
new Monad[Stream[F, *]] {
...
}
will become like below?
private[fs2] trait StreamLowPriority {
implicit def monadInstance[F[_]]: StreamMonad[F] =
new StreamMonad[F]] {
...
}
}
But if it is, how to do a similar thing to MonadError[Stream[F, *], Throwable]? I see it has the second param called 'Throwable'. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
implicit def monadErrorInstance[F[_]]: MonadError[Stream[F, *], Throwable] =
new StreamMonad[F] with MonadError[Stream[F, *], Throwable] {
// inherit monad methods, implement monad error methods
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! It's very helpful.😊
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@armanbilge IMHIO it is always good practice to give explicit names to long classes, just as it is good practice to name long lambdas. @yyy1000 Just remember to keep it private to the Stream
companion object.
I try to extract the class and please review. :) |
@@ -3089,6 +3089,21 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, | |||
override def toString: String = "Stream(..)" | |||
} | |||
|
|||
class StreamMonad[F[_]] extends Monad[Stream[F, *]] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we place this inside the Stream
companion object and mark it as private
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I will do it the next day :), (it's a little late in my place)!
override def pure[A](x: A): Stream[F, A] = Stream.emit(x) | ||
|
||
implicit def monadInstance[F[_]]: StreamMonad[F] = | ||
new StreamMonad[F] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since StreamMonad
is not an abstract class, you can omit the body (the {
/ }
block) here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! :)
If I do this, I will move the unit
method to the StreamMonad
. And monadErrorInstance
will also use that method.
But it don't use the override unit method before.
override def tailRecM[A, B](a: A)(f: A => Stream[F, Either[A, B]]): Stream[F, B] = | ||
f(a).flatMap { | ||
case Left(a) => tailRecM(a)(f) | ||
case Right(b) => Stream(b) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you do extends StackSafeMonad[Stream[F, *]]
instead of Monad
then we don't need this method implementation :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK!
Co-authored-by: Arman Bilge <armanbilge@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all your work on this!
It wants to fix #3187
It's my first time to PR typelevel! If there's something I need to improve or change, I'm willing to do that!