Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to Pekko (used in tests only anyway) #198

Merged
merged 1 commit into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions netty-reactive-streams-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@
<artifactId>testng</artifactId>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream_2.12</artifactId>
<version>${akka-stream.version}</version>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Version is defined in parent pom anyway

<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream_2.12</artifactId>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.typesafe.netty.http;

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.function.Function;
import akka.stream.Materializer;
import akka.stream.javadsl.*;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.japi.function.Function;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.javadsl.*;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.http.*;
Expand All @@ -30,11 +30,11 @@
* body back.
*
* The server uses the {@link HttpStreamsServerHandler}, and then exposes the messages sent/received by
* that using reactive streams. So it effectively uses streams of streams. It then uses Akka streams
* that using reactive streams. So it effectively uses streams of streams. It then uses Pekko streams
* to actually handle the requests, echoing the bodies back in the responses as is.
*
* The client uses the {@link HttpStreamsClientHandler}, and then exposes the messages sent/received by
* that using reactive streams, so it too is effectively a stream of streams. Here Akka streams is used
* that using reactive streams, so it too is effectively a stream of streams. Here Pekko streams is used
* to split the String bodies into many chunks, for more interesting verification of the bodies, and then
* combines all the chunks together back into a String at the end.
*/
Expand Down Expand Up @@ -72,7 +72,7 @@ public HttpResponse apply(HttpRequest request) throws Exception {
serverBindChannel = server.bind(new InetSocketAddress("127.0.0.1", 0), new Callable<Processor<HttpRequest, HttpResponse>>() {
@Override
public Processor<HttpRequest, HttpResponse> call() throws Exception {
return AkkaStreamsUtil.flowToProcessor(flow, materializer);
return PekkoStreamsUtil.flowToProcessor(flow, materializer);
}
}).await().channel();
}
Expand Down Expand Up @@ -112,7 +112,7 @@ public HttpRequest apply(String body) throws Exception {
}
})
// Send the flow via the HTTP client connection
.via(AkkaStreamsUtil.processorToFlow(connection))
.via(PekkoStreamsUtil.processorToFlow(connection))
// Convert the responses to Strings
.mapAsync(4, new Function<HttpResponse, CompletionStage<String>>() {
@Override
Expand All @@ -121,7 +121,7 @@ public CompletionStage<String> apply(HttpResponse response) throws Exception {
}
});

return AkkaStreamsUtil.flowToProcessor(flow, materializer);
return PekkoStreamsUtil.flowToProcessor(flow, materializer);
}

private Processor<HttpRequest, HttpResponse> getProcessor(ProcessorHttpClient client) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.typesafe.netty.http;

import akka.japi.function.Function2;
import akka.stream.Materializer;
import akka.stream.javadsl.AsPublisher;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import org.apache.pekko.japi.function.Function2;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.javadsl.AsPublisher;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.*;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.typesafe.netty.http;

import akka.actor.ActorSystem;
import akka.japi.function.Function;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.japi.function.Function;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.javadsl.Flow;
import com.typesafe.netty.HandlerPublisher;
import com.typesafe.netty.HandlerSubscriber;
import io.netty.bootstrap.Bootstrap;
Expand Down Expand Up @@ -226,7 +226,7 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
HandlerPublisher<HttpRequest> publisher = new HandlerPublisher<>(ctx.executor(), HttpRequest.class);
HandlerSubscriber<HttpResponse> subscriber = new HandlerSubscriber<>(ctx.executor());
ctx.pipeline().addLast(publisher, subscriber);
Processor<HttpRequest, HttpResponse> processor = AkkaStreamsUtil.flowToProcessor(Flow.<HttpRequest>create()
Processor<HttpRequest, HttpResponse> processor = PekkoStreamsUtil.flowToProcessor(Flow.<HttpRequest>create()
.mapAsync(4, new Function<HttpRequest, CompletionStage<String>>() {
public CompletionStage<String> apply(HttpRequest request) throws Exception {
return helper.extractBodyAsync(request);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.typesafe.netty.http;

import akka.japi.Pair;
import akka.japi.function.Creator;
import akka.stream.Materializer;
import akka.stream.javadsl.*;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.japi.function.Creator;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.javadsl.*;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public class AkkaStreamsUtil {
public class PekkoStreamsUtil {

public static <In, Out> Processor<In, Out> flowToProcessor(Flow<In, Out, ?> flow, Materializer materializer) {
Pair<Subscriber<In>, Publisher<Out>> pair =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.typesafe.netty.http;

import akka.actor.ActorSystem;
import akka.japi.function.Function;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.japi.function.Function;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.javadsl.Flow;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
Expand Down
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream_2.12</artifactId>
<version>${akka-stream.version}</version>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-stream_2.12</artifactId>
<version>${pekko-stream.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand All @@ -86,7 +86,7 @@
<properties>
<netty.version>4.1.97.Final</netty.version>
<reactive-streams.version>1.0.4</reactive-streams.version>
<akka-stream.version>2.6.21</akka-stream.version>
<pekko-stream.version>1.0.1</pekko-stream.version>
<maven-bundle-plugin.version>5.1.9</maven-bundle-plugin.version>
<maven-jar-plugin.version>3.3.0</maven-jar-plugin.version>
</properties>
Expand Down