diff --git a/core/src/main/scala/io/finch/EncodeStream.scala b/core/src/main/scala/io/finch/EncodeStream.scala new file mode 100644 index 000000000..9be46e5af --- /dev/null +++ b/core/src/main/scala/io/finch/EncodeStream.scala @@ -0,0 +1,26 @@ +package io.finch + +import com.twitter.io.{Buf, Reader} +import java.nio.charset.Charset + +/** + * A type-class that defines encoding of a stream in a shape of `S[F[_], A]` to Finagle's [[Reader]]. + */ +trait EncodeStream[S[_[_], _], F[_], A] { + + type ContentType <: String + + def apply(s: S[F, A], cs: Charset): Reader[Buf] +} + +object EncodeStream { + + type Aux[S[_[_], _], F[_], A, CT <: String] = + EncodeStream[S, F, A] { type ContentType = CT } + + type Json[S[_[_],_], F[_], A] = Aux[S, F, A, Application.Json] + + type Text[S[_[_],_], F[_], A] = Aux[S, F, A, Text.Plain] +} + + diff --git a/core/src/main/scala/io/finch/Endpoint.scala b/core/src/main/scala/io/finch/Endpoint.scala index 3636c6539..019554762 100644 --- a/core/src/main/scala/io/finch/Endpoint.scala +++ b/core/src/main/scala/io/finch/Endpoint.scala @@ -893,8 +893,8 @@ object Endpoint { * an `S[F, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests. */ def streamBinaryBody[F[_], S[_[_], _]](implicit - liftReader: LiftReader[S, F], - F: Effect[F] + liftReader: LiftReader[S, F], + F: Effect[F] ): Endpoint[F, S[F, Buf]] = { new Endpoint[F, S[F, Buf]] { final def apply(input: Input): Endpoint.Result[F, S[F, Buf]] = { @@ -915,9 +915,9 @@ object Endpoint { } def streamJsonBody[F[_], S[_[_], _], A](implicit - streamDecoder: DecodeStream.Aux[S, F, A, Application.Json], - liftReader: LiftReader[S, F], - F: Effect[F] + streamDecoder: DecodeStream.Aux[S, F, A, Application.Json], + liftReader: LiftReader[S, F], + F: Effect[F] ): Endpoint[F, S[F, A]] = new Endpoint[F, S[F, A]] { final def apply(input: Input): Result[F, S[F, A]] = { streamBinaryBody.apply(input).map(streamDecoder(_, input.request.charsetOrUtf8)) diff --git a/core/src/main/scala/io/finch/Input.scala b/core/src/main/scala/io/finch/Input.scala index fc5baaa8c..d2bd68aee 100644 --- a/core/src/main/scala/io/finch/Input.scala +++ b/core/src/main/scala/io/finch/Input.scala @@ -3,7 +3,7 @@ package io.finch import cats.Eq import com.twitter.finagle.http.{Method, Request} import com.twitter.finagle.netty3.ChannelBufferBuf -import com.twitter.io.Buf +import com.twitter.io.{Buf, Reader} import java.nio.charset.{Charset, StandardCharsets} import org.jboss.netty.handler.codec.http.{DefaultHttpRequest, HttpMethod, HttpVersion} import org.jboss.netty.handler.codec.http.multipart.{DefaultHttpDataFactory, HttpPostRequestEncoder} @@ -30,8 +30,21 @@ final case class Input(request: Request, route: List[String]) { * ``` * import io.finch._, io.circe._ * - * val text: Input = Input.post("/").withBody[Text.Plain]("Text Body") - * val json: Input = Input.post("/").withBody[Application.Json](Map("json" -> "object")) + * val text = Input.post("/").withBody[Text.Plain]("Text Body") + * val json = Input.post("/").withBody[Application.Json](Map("json" -> "object")) + *``` + * + * Also possible to create chunked inputs passing a stream as an argument. + * + *``` + * import io.finch._, io.finch.iteratee._, cats.effect.IO, io.iteratee.Enumerator + * import io.finch.circe._, io.circe.generic.auto._ + * + * val enumerateText = Enumerator.enumerate[IO, String]("foo", "bar") + * val text = Input.post("/").withBody[Text.Plain](enumerateText) + * + * val enumerateJson = Enumerate.enumerate[IO, Map[String, String]](Map("foo" - "bar")) + * val json = Input.post("/").withBody[Application.Json](enumerateJson) *``` */ def withBody[CT <: String]: Input.Body[CT] = new Input.Body[CT](this) @@ -75,7 +88,7 @@ final case class Input(request: Request, route: List[String]) { ) } else ChannelBufferBuf.Owned(req.getContent) - withBody[Application.WwwFormUrlencoded](content, Some(StandardCharsets.UTF_8)) + withBody[Application.WwwFormUrlencoded](content, StandardCharsets.UTF_8) } } @@ -84,12 +97,13 @@ final case class Input(request: Request, route: List[String]) { */ object Input { - private final def copyRequest(from: Request): Request = { - val to = Request() - to.version = from.version - to.method = from.method + private final def copyRequest(from: Request): Request = + copyRequestWithReader(from, from.reader) + + private final def copyRequestWithReader(from: Request, reader: Reader[Buf]): Request = { + val to = Request(from.version, from.method, from.uri, reader) + to.setChunked(from.isChunked) to.content = from.content - to.uri = from.uri from.headerMap.foreach { case (k, v) => to.headerMap.put(k, v) } to @@ -99,17 +113,39 @@ object Input { * A helper class that captures the `Content-Type` of the payload. */ class Body[CT <: String](i: Input) { - def apply[A](body: A, charset: Option[Charset] = None)(implicit - e: Encode.Aux[A, CT], w: Witness.Aux[CT] - ): Input = { - val content = e(body, charset.getOrElse(StandardCharsets.UTF_8)) + def apply[A](body: A)(implicit e: Encode.Aux[A, CT], w: Witness.Aux[CT]): Input = + apply[A](body, StandardCharsets.UTF_8) + def apply[A](body: A, charset: Charset)(implicit + e: Encode.Aux[A, CT], W: Witness.Aux[CT] + ): Input = { + val content = e(body, charset) val copied = copyRequest(i.request) + copied.setChunked(false) copied.content = content - copied.contentType = w.value + copied.contentType = W.value copied.contentLength = content.length.toLong - charset.foreach(cs => copied.charset = cs.displayName().toLowerCase) + copied.charset = charset.displayName().toLowerCase + + Input(copied, i.route) + } + + def apply[S[_[_], _], F[_], A](s: S[F, A])(implicit + S: EncodeStream.Aux[S, F, A, CT], W: Witness.Aux[CT] + ): Input = apply[S, F, A](s, StandardCharsets.UTF_8) + + def apply[S[_[_], _], F[_], A](s: S[F, A], charset: Charset)(implicit + S: EncodeStream.Aux[S, F, A, CT], + W: Witness.Aux[CT] + ): Input = { + val content = S(s, charset) + val copied = copyRequestWithReader(i.request, content) + + copied.setChunked(true) + copied.contentType = W.value + copied.headerMap.setUnsafe("Transfer-Encoding", "chunked") + copied.charset = charset.displayName().toLowerCase Input(copied, i.route) } diff --git a/core/src/main/scala/io/finch/LiftReader.scala b/core/src/main/scala/io/finch/LiftReader.scala index df85faab1..a29cb3465 100644 --- a/core/src/main/scala/io/finch/LiftReader.scala +++ b/core/src/main/scala/io/finch/LiftReader.scala @@ -8,13 +8,10 @@ import com.twitter.io.{Buf, Reader} trait LiftReader[S[_[_], _], F[_]] { def apply(reader: Reader[Buf]): S[F, Buf] - } object LiftReader { - def instance[S[_[_], _], F[_]](fn: Reader[Buf] => S[F, Buf]): LiftReader[S, F] = new LiftReader[S, F] { def apply(reader: Reader[Buf]): S[F, Buf] = fn(reader) } - } diff --git a/core/src/main/scala/io/finch/ToResponse.scala b/core/src/main/scala/io/finch/ToResponse.scala index c14d52bc5..191e344e4 100644 --- a/core/src/main/scala/io/finch/ToResponse.scala +++ b/core/src/main/scala/io/finch/ToResponse.scala @@ -76,7 +76,7 @@ trait HighPriorityToResponseInstances extends LowPriorityToResponseInstances { w: Witness.Aux[CT] ): Aux[A, CT] = instance { (a, cs) => val buf = e(a, cs) - val rep = Response() + val rep = Response(Version.Http11, Status.Ok) if (!buf.isEmpty) { rep.content = buf @@ -85,6 +85,18 @@ trait HighPriorityToResponseInstances extends LowPriorityToResponseInstances { rep } + + implicit def streamToResponse[S[_[_], _], F[_], A, CT <: String](implicit + E: EncodeStream.Aux[S, F, A, CT], + W: Witness.Aux[CT] + ): Aux[S[F, A], CT] = instance { (a, cs) => + val stream = E(a, cs) + val rep = Response(Version.Http11, Status.Ok, stream) + rep.headerMap.setUnsafe("Content-Type", W.value) + rep.headerMap.setUnsafe("Transfer-Encoding", "chunked") + + rep + } } object ToResponse extends HighPriorityToResponseInstances { diff --git a/core/src/test/scala/io/finch/InputSpec.scala b/core/src/test/scala/io/finch/InputSpec.scala index 6ae96152f..a174b07b4 100644 --- a/core/src/test/scala/io/finch/InputSpec.scala +++ b/core/src/test/scala/io/finch/InputSpec.scala @@ -3,7 +3,8 @@ package io.finch import java.nio.charset.Charset import com.twitter.finagle.http.Method -import com.twitter.io.Buf +import com.twitter.io.{Buf, Pipe, Reader} +import com.twitter.util.{Await, Future} import io.finch.data.Foo import io.finch.internal.HttpContent @@ -34,21 +35,42 @@ class InputSpec extends FinchSpec { } } - it should "add content through withBody" in { + it should "add fully-buffered content through withBody" in { check { (i: Input, b: Buf) => i.withBody[Text.Plain](b).request.content === b } } - it should "add content corresponding to a class through withBody[JSON]" in { - implicit val encodeFoo: Encode.Json[Foo] = Encode.json( - (a, cs) => Buf.ByteArray.Owned(a.s.getBytes(cs.name)) - ) + it should "add chunked content throhg withBody" in { + type ListStream[F[_], A] = List[A] + implicit def listToReader[F[_], CT <: String]: EncodeStream.Aux[ListStream, F, Buf, CT] = + new EncodeStream[ListStream, F, Buf] { + type ContentType = CT + + def apply(s: ListStream[F, Buf], cs: Charset): Reader[Buf] = { + val p = new Pipe[Buf] + + def loop(from: List[Buf]): Future[Unit] = from match { + case h :: t => p.write(h).before(loop(t)) + case _ => p.close() + } + + loop(s) + p + } + } + check { (i: Input, s: List[Buf]) => + val out = i.withBody[Application.OctetStream].apply[ListStream, Future, Buf](s).request.reader + s.forall(buf => buf == Await.result(out.read()).get) + } + } + + it should "add content corresponding to a class through withBody[JSON]" in { check { (i: Input, f: Foo, cs: Charset) => - val input = i.withBody[Application.Json](f, Some(cs)) + val input = i.withBody[Application.Json](f, cs) - input.request.content.asString(cs) === f.s && + input.request.content.asString(cs) === s"""{s:"${f.s}"""" && input.request.contentType === Some(s"application/json;charset=${cs.displayName.toLowerCase}") } } diff --git a/core/src/test/scala/io/finch/data/Foo.scala b/core/src/test/scala/io/finch/data/Foo.scala index ecd86a7e5..fcfbef917 100644 --- a/core/src/test/scala/io/finch/data/Foo.scala +++ b/core/src/test/scala/io/finch/data/Foo.scala @@ -29,6 +29,9 @@ object Foo { ) ) + implicit val encodeJsonFoo: Encode.Json[Foo] = + Encode.json((foo, cs) => Buf.ByteArray.Owned(s"""{s:"${foo.s}"""".getBytes(cs))) + implicit val arbitraryFoo: Arbitrary[Foo] = Arbitrary(Gen.alphaStr.suchThat(_.nonEmpty).map(Foo.apply)) } diff --git a/fs2/src/main/scala/io/finch/fs2/package.scala b/fs2/src/main/scala/io/finch/fs2/package.scala index 5923caad0..31e16f1f6 100644 --- a/fs2/src/main/scala/io/finch/fs2/package.scala +++ b/fs2/src/main/scala/io/finch/fs2/package.scala @@ -2,63 +2,67 @@ package io.finch import _root_.fs2.Stream import cats.effect.Effect -import com.twitter.finagle.http.Response -import com.twitter.io.Buf +import com.twitter.io.{Buf, Pipe, Reader, Writer} import com.twitter.util.Future -import shapeless.Witness +import java.nio.charset.Charset package object fs2 extends StreamInstances { - implicit def streamLiftReader[F[_] : Effect](implicit toEffect: ToEffect[Future, F]): LiftReader[Stream, F] = - LiftReader.instance { reader => - Stream - .repeatEval(Effect[F].defer(toEffect(reader.read))) - .unNoneTerminate - .onFinalize(Effect[F].delay(reader.discard())) - } - - implicit def streamToJsonResponse[F[_] : Effect, A](implicit - e: Encode.Aux[A, Application.Json], - w: Witness.Aux[Application.Json] - ): ToResponse.Aux[Stream[F, A], Application.Json] = { - mkToResponse[F, A, Application.Json](delimiter = Some(ToResponse.NewLine)) + implicit def streamLiftReader[F[_]](implicit + F: Effect[F], + TE: ToEffect[Future, F] + ): LiftReader[Stream, F] = LiftReader.instance { reader => + Stream + .repeatEval(F.defer(TE(reader.read))) + .unNoneTerminate + .onFinalize(F.delay(reader.discard())) } + implicit def encodeJsonFs2Stream[F[_]: Effect, A](implicit + A: Encode.Json[A] + ): EncodeStream.Json[Stream, F, A] = + new EncodeFs2Stream[F, A, Application.Json] { + protected def encodeChunk(chunk: A, cs: Charset): Buf = A(chunk, cs) + override protected def writeChunk(chunk: Buf, w: Writer[Buf]): Future[Unit] = + w.write(chunk.concat(ToResponse.NewLine)) + } + + implicit def encodeTextFs2Stream[F[_]: Effect, A](implicit + A: Encode.Text[A] + ): EncodeStream.Text[Stream, F, A] = + new EncodeFs2Stream[F, A, Text.Plain] { + override protected def encodeChunk(chunk: A, cs: Charset): Buf = A(chunk, cs) + } } trait StreamInstances { + protected abstract class EncodeFs2Stream[F[_], A, CT <: String](implicit + F: Effect[F], + TE: ToEffect[Future, F] + ) extends EncodeStream[Stream, F, A] { - implicit def streamToResponse[F[_] : Effect, A, CT <: String](implicit - e: Encode.Aux[A, CT], - w: Witness.Aux[CT], - toEffect: ToEffect[Future, F] - ): ToResponse.Aux[Stream[F, A], CT] = { - mkToResponse[F, A, CT](delimiter = None) - } + type ContentType = CT + + protected def encodeChunk(chunk: A, cs: Charset): Buf + protected def writeChunk(chunk: Buf, w: Writer[Buf]): Future[Unit] = w.write(chunk) - protected def mkToResponse[F[_] : Effect, A, CT <: String](delimiter: Option[Buf])(implicit - e: Encode.Aux[A, CT], - w: Witness.Aux[CT], - toEffect: ToEffect[Future, F] - ): ToResponse.Aux[Stream[F, A], CT] = { - ToResponse.instance[Stream[F, A], CT]((stream, cs) => { - val response = Response() - response.setChunked(true) - response.contentType = w.value - val writer = response.writer - val effect = stream - .map(e.apply(_, cs)) - .evalMap(buf => toEffect(writer.write(delimiter match { - case Some(d) => buf.concat(d) - case _ => buf - }))) - .onFinalize(Effect[F].defer(toEffect(writer.close()))) + def apply(s: Stream[F, A], cs: Charset): Reader[Buf] = { + val p = new Pipe[Buf] + val run = s + .map(chunk => encodeChunk(chunk, cs)) + .evalMap(chunk => TE(writeChunk(chunk, p))) + .onFinalize(F.suspend(TE(p.close()))) .compile .drain - Effect[F].toIO(effect).unsafeRunAsyncAndForget() - response - }) + F.toIO(run).unsafeRunAsyncAndForget() + p + } } + implicit def encodeBufFs2[F[_]: Effect, CT <: String]: EncodeStream.Aux[Stream, F, Buf, CT] = + new EncodeFs2Stream[F, Buf, CT] { + protected def encodeChunk(chunk: Buf, cs: Charset): Buf = chunk + } + } diff --git a/iteratee/src/main/scala/io/finch/iteratee/package.scala b/iteratee/src/main/scala/io/finch/iteratee/package.scala index f5f8d9453..21948c991 100644 --- a/iteratee/src/main/scala/io/finch/iteratee/package.scala +++ b/iteratee/src/main/scala/io/finch/iteratee/package.scala @@ -1,98 +1,115 @@ -package io.finch - -import cats.effect.Effect -import com.twitter.finagle.http.Response -import com.twitter.io._ -import com.twitter.util.Future -import io.finch.internal._ -import io.finch.items.RequestItem -import io.iteratee.{Enumerator, Iteratee} -import shapeless.Witness - -/** - * Iteratee module - */ -package object iteratee extends IterateeInstances { - - - /** - * An evaluating [[Endpoint]] that reads a required chunked streaming binary body, interpreted as - * an `Enumerator[Future, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests. - */ - @deprecated("Use Endpoint.streamJsonBody or Endpoint.streamBinaryBody instead", "0.27.0") - def enumeratorBody[F[_] : Effect, A, CT <: String](implicit - decode: DecodeStream.Aux[Enumerator, F, A, CT] - ): Endpoint[F, Enumerator[F, A]] = new Endpoint[F, Enumerator[F, A]] { - final def apply(input: Input): Endpoint.Result[F, Enumerator[F, A]] = { - if (!input.request.isChunked) EndpointResult.NotMatched[F] - else { - val req = input.request - EndpointResult.Matched( - input, - Trace.empty, - Effect[F].pure(Output.payload(decode(enumeratorLiftReader.apply(req.reader), req.charsetOrUtf8))) - ) - } - } - - final override def item: RequestItem = items.BodyItem - final override def toString: String = "enumeratorBody" - } - - implicit def enumeratorLiftReader[F[_] : Effect](implicit - toEffect: ToEffect[Future, F] - ): LiftReader[Enumerator, F] = - LiftReader.instance { reader => - def rec(reader: Reader[Buf]): Enumerator[F, Buf] = { - Enumerator.liftM[F, Option[Buf]] { - Effect[F].defer(toEffect(reader.read())) - }.flatMap { - case None => Enumerator.empty[F, Buf] - case Some(buf) => Enumerator.enumOne[F, Buf](buf).append(rec(reader)) - } - } - rec(reader).ensure(Effect[F].delay(reader.discard())) - } - - implicit def enumeratorToJsonResponse[F[_] : Effect, A](implicit - e: Encode.Aux[A, Application.Json], - w: Witness.Aux[Application.Json] - ): ToResponse.Aux[Enumerator[F, A], Application.Json] = { - mkToResponse[F, A, Application.Json](delimiter = Some(ToResponse.NewLine)) - } - -} - -trait IterateeInstances { - - implicit def enumeratorToResponse[F[_] : Effect, A, CT <: String](implicit - e: Encode.Aux[A, CT], - w: Witness.Aux[CT], - toEffect: ToEffect[Future, F] - ): ToResponse.Aux[Enumerator[F, A], CT] = { - mkToResponse[F, A, CT](delimiter = None) - } - - protected def mkToResponse[F[_] : Effect, A, CT <: String](delimiter: Option[Buf])(implicit - e: Encode.Aux[A, CT], - w: Witness.Aux[CT], - toEffect: ToEffect[Future, F] - ): ToResponse.Aux[Enumerator[F, A], CT] = { - ToResponse.instance[Enumerator[F, A], CT]((enum, cs) => { - val response = Response() - response.setChunked(true) - response.contentType = w.value - val writer = response.writer - val iteratee = Iteratee.foreachM[F, Buf]((buf: Buf) => toEffect(writer.write(delimiter match { - case Some(d) => buf.concat(d) - case _ => buf - }))) - val stream = enum - .ensure(Effect[F].defer(toEffect(writer.close()))) - .map(e.apply(_, cs)) - .into(iteratee) - Effect[F].toIO(stream).unsafeRunAsyncAndForget() - response - }) - } -} +package io.finch + +import cats.effect.Effect +import com.twitter.io._ +import com.twitter.util.Future +import io.finch.internal._ +import io.finch.items.RequestItem +import io.iteratee.{Enumerator, Iteratee} +import java.nio.charset.Charset + +package object iteratee extends IterateeInstances { + + /** + * An evaluating [[Endpoint]] that reads a required chunked streaming binary body, interpreted as + * an `Enumerator[Future, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests. + */ + @deprecated("Use Endpoint.streamJsonBody or Endpoint.streamBinaryBody instead", "0.27.0") + def enumeratorBody[F[_], A, CT <: String](implicit + F: Effect[F], + LR: LiftReader[Enumerator, F], + decode: DecodeStream.Aux[Enumerator, F, A, CT] + ): Endpoint[F, Enumerator[F, A]] = new Endpoint[F, Enumerator[F, A]] { + final def apply(input: Input): Endpoint.Result[F, Enumerator[F, A]] = { + if (!input.request.isChunked) EndpointResult.NotMatched[F] + else { + val req = input.request + EndpointResult.Matched( + input, + Trace.empty, + F.pure(Output.payload(decode(LR(req.reader), req.charsetOrUtf8))) + ) + } + } + + final override def item: RequestItem = items.BodyItem + final override def toString: String = "enumeratorBody" + } + + /** + * An evaluating [[Endpoint]] that reads a required chunked streaming JSON body, interpreted as + * an `Enumerator[Future, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests. + */ + @deprecated("Use Endpoint.streamJsonBody or Endpoint.streamBinaryBody instead", "0.27.0") + def enumeratorJsonBody[F[_] : Effect, A](implicit + decode: DecodeStream.Aux[Enumerator, F, A, Application.Json] + ): Endpoint[F, Enumerator[F, A]] = enumeratorBody[F, A, Application.Json].withToString("enumeratorJsonBody") +} + +trait IterateeInstances extends LowPriorityIterateeInstances { + + implicit def enumeratorLiftReader[F[_]](implicit + F: Effect[F], + TE: ToEffect[Future, F] + ): LiftReader[Enumerator, F] = LiftReader.instance { reader => + def loop(): Enumerator[F, Buf] = { + Enumerator.liftM[F, Option[Buf]] { + F.defer(TE(reader.read())) + }.flatMap { + case None => Enumerator.empty[F, Buf] + case Some(buf) => Enumerator.enumOne[F, Buf](buf).append(loop()) + } + } + + loop().ensure(F.delay(reader.discard())) + } + + implicit def encodeJsonEnumerator[F[_]: Effect, A](implicit + A: Encode.Json[A] + ): EncodeStream.Json[Enumerator, F, A] = + new EncodeEnumerator[F, A, Application.Json] { + protected def encodeChunk(chunk: A, cs: Charset): Buf = A(chunk, cs) + override protected def writeChunk(chunk: Buf, w: Writer[Buf]): Future[Unit] = + w.write(chunk.concat(ToResponse.NewLine)) + } + + implicit def encodeTextEnumerator[F[_]: Effect, A](implicit + A: Encode.Text[A] + ): EncodeStream.Text[Enumerator, F, A] = + new EncodeEnumerator[F, A, Text.Plain] { + override protected def encodeChunk(chunk: A, cs: Charset): Buf = A(chunk, cs) + } +} + +trait LowPriorityIterateeInstances { + + protected abstract class EncodeEnumerator[F[_], A, CT <: String](implicit + F: Effect[F], + TE: ToEffect[Future, F] + ) extends EncodeStream[Enumerator, F, A] { + + type ContentType = CT + + protected def encodeChunk(chunk: A, cs: Charset): Buf + protected def writeChunk(chunk: Buf, w: Writer[Buf]): Future[Unit] = w.write(chunk) + + private def writeIteratee(w: Writer[Buf]): Iteratee[F, Buf, Unit] = + Iteratee.foreachM[F, Buf](chunk => TE(writeChunk(chunk, w))) + + def apply(s: Enumerator[F, A], cs: Charset): Reader[Buf] = { + val p = new Pipe[Buf] + val run = s + .ensure(F.suspend(TE(p.close()))) + .map(chunk => encodeChunk(chunk, cs)) + .into(writeIteratee(p)) + + F.toIO(run).unsafeRunAsyncAndForget() + p + } + } + + implicit def encodeBufEnumerator[F[_]: Effect, CT <: String]: EncodeStream.Aux[Enumerator, F, Buf, CT] = + new EncodeEnumerator[F, Buf, CT] { + protected def encodeChunk(chunk: Buf, cs: Charset): Buf = chunk + } +}