Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix grpc unittest #2956

Open
wants to merge 56 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
61cdb9a
init
fengye404 Jun 30, 2024
c1a9ea8
init
fengye404 Jun 30, 2024
610d121
init
fengye404 Jul 3, 2024
871b0d7
init
fengye404 Jul 7, 2024
f5732a6
init
fengye404 Jul 9, 2024
96dcd06
update: add arthasSample
fengye404 Jul 14, 2024
e3ec9e2
update: add MiniTemplator
fengye404 Jul 21, 2024
8761959
init
fengye404 Jul 23, 2024
0116fcd
update: add protobuf codec
fengye404 Jul 24, 2024
29eb16f
update: add protobuf codec
fengye404 Jul 28, 2024
faff33f
update: add protobuf codec
fengye404 Jul 29, 2024
900c89f
update: add protobuf codec
fengye404 Jul 31, 2024
8dd4c42
update: add protobuf codec
fengye404 Aug 1, 2024
bc81e98
update: add protobuf codec
fengye404 Aug 3, 2024
1640f19
update: add protobuf codec
fengye404 Aug 3, 2024
4d960ae
update: add protobuf codec
fengye404 Aug 4, 2024
f720222
update: add protobuf codec
fengye404 Aug 5, 2024
be1d85b
update: add protobuf codec
fengye404 Aug 7, 2024
5871052
update: add protobuf codec
fengye404 Aug 8, 2024
500e64a
update: add protobuf codec
fengye404 Aug 9, 2024
2248532
update: add grpc handler
fengye404 Aug 11, 2024
77abc18
update: add grpc handler
fengye404 Aug 12, 2024
ec65cb3
update: add grpc handler
fengye404 Aug 19, 2024
f55e872
update: add grpc handler
fengye404 Aug 27, 2024
5732edd
update: add grpc handler
fengye404 Sep 4, 2024
c8f9310
update: add grpc handler
fengye404 Sep 5, 2024
526cdd9
update: add grpc dispatcher
fengye404 Sep 9, 2024
cb669af
update: add grpc dispatcher
fengye404 Sep 12, 2024
bfee0f1
update: complete grpc dispatcher
fengye404 Sep 13, 2024
96251c5
update: add stream handler
fengye404 Sep 15, 2024
b2c289f
update: add stream handler
fengye404 Sep 16, 2024
a3426b7
update: Optimize the situation where multiple grpc bodies exist in th…
fengye404 Sep 18, 2024
893f7f3
update: add unit test
fengye404 Sep 19, 2024
f3050ab
update: formatter
fengye404 Sep 22, 2024
7aa63aa
update: add unit test and error process
fengye404 Sep 23, 2024
2061dea
update: add maven profiles
fengye404 Oct 10, 2024
e594712
update: add more unit test and log
fengye404 Oct 12, 2024
45b6027
update: Migration from jprotobuf serialization to protobuf:protoc; mi…
fengye404 Oct 14, 2024
b2a95f4
update: optimize unit tests
fengye404 Oct 15, 2024
850a9b8
update: optimize unit tests
fengye404 Oct 15, 2024
ea54ebe
update: remove custom protobuf serialization implementation, and add …
fengye404 Oct 20, 2024
7ae26fe
update: Refactored the gRPC execution part, dividing it into four typ…
fengye404 Oct 23, 2024
f63c4f8
update: reformat
fengye404 Oct 23, 2024
bde1171
update: add README.md
fengye404 Oct 23, 2024
c49c4ba
update: completed client stream
fengye404 Oct 24, 2024
ed45cdb
update: Implement data isolation for stream invoke
fengye404 Oct 26, 2024
06ddcff
update: Implement serverStream and biStream
fengye404 Oct 26, 2024
1cd9c05
Merge branch 'master' into grpc
fengye404 Oct 28, 2024
dbb2861
update: move maven module
fengye404 Oct 28, 2024
d21c574
update: Limit the maximum time for latch.await
fengye404 Nov 24, 2024
8a07354
Merge branch 'alibaba:master' into grpc
fengye404 Nov 24, 2024
fe425cb
update: Limit the maximum time for latch.await
fengye404 Nov 24, 2024
c58691e
Merge remote-tracking branch 'origin/grpc' into grpc
fengye404 Nov 24, 2024
133a148
update: add more log for unittest
fengye404 Nov 24, 2024
ec80656
update: modify unittest port
fengye404 Nov 24, 2024
8ba3740
update: modify unittest port
fengye404 Nov 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/
public class ArthasGrpcBootstrap {
public static void main(String[] args) {
ArthasGrpcServer arthasGrpcServer = new ArthasGrpcServer(9090, null);
ArthasGrpcServer arthasGrpcServer = new ArthasGrpcServer(9091, null);
arthasGrpcServer.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class ArthasGrpcServer {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass().getName());

private int port = 9090;
private int port = 9091;

private String grpcServicePackageName;

Expand Down
92 changes: 50 additions & 42 deletions labs/arthas-grpc-server/src/test/java/unittest/grpc/GrpcTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,24 @@

import arthas.grpc.unittest.ArthasUnittest;
import arthas.grpc.unittest.ArthasUnittestServiceGrpc;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import com.taobao.arthas.grpc.server.ArthasGrpcServer;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand All @@ -27,50 +29,59 @@
*/
public class GrpcTest {
private static final String HOST = "localhost";
private static final int PORT = 9090;
private static final int PORT = 9092;
private static final String HOST_PORT = HOST + ":" + PORT;
private static final String UNIT_TEST_GRPC_SERVICE_PACKAGE_NAME = "unittest.grpc.service.impl";
private ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub blockingStub = null;
private static final Logger log = (Logger) LoggerFactory.getLogger(GrpcTest.class);
private ManagedChannel clientChannel;
Random random = new Random();
ExecutorService threadPool = Executors.newFixedThreadPool(10);


@Before
public void startServer() {
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
Logger rootLogger = loggerContext.getLogger("ROOT");

rootLogger.setLevel(Level.INFO);

Thread grpcWebProxyStart = new Thread(() -> {
ArthasGrpcServer arthasGrpcServer = new ArthasGrpcServer(PORT, UNIT_TEST_GRPC_SERVICE_PACKAGE_NAME);
arthasGrpcServer.start();
});
grpcWebProxyStart.start();

clientChannel = ManagedChannelBuilder.forTarget(HOST_PORT)
.usePlaintext()
.build();
}

@Test
public void testUnary() {
ManagedChannel channel = ManagedChannelBuilder.forTarget(HOST_PORT)
.usePlaintext()
.build();
log.info("testUnary start!");

ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub stub = ArthasUnittestServiceGrpc.newBlockingStub(channel);

ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub stub = ArthasUnittestServiceGrpc.newBlockingStub(clientChannel);

try {
ArthasUnittest.ArthasUnittestRequest request = ArthasUnittest.ArthasUnittestRequest.newBuilder().setMessage("unaryInvoke").build();
ArthasUnittest.ArthasUnittestResponse res = stub.unary(request);
System.out.println(res.getMessage());
} finally {
channel.shutdownNow();
clientChannel.shutdownNow();
}
log.info("testUnary success!");
}

@Test
public void testUnarySum() throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forTarget(HOST_PORT)
.usePlaintext()
.build();
log.info("testUnarySum start!");

ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub stub = ArthasUnittestServiceGrpc.newBlockingStub(channel);
ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub stub = ArthasUnittestServiceGrpc.newBlockingStub(clientChannel);
for (int i = 0; i < 10; i++) {
AtomicInteger sum = new AtomicInteger(0);
int finalId = i;
for (int j = 0; j < 100; j++) {
for (int j = 0; j < 10; j++) {
int num = random.nextInt(101);
sum.addAndGet(num);
threadPool.submit(() -> {
Expand All @@ -82,17 +93,16 @@ public void testUnarySum() throws InterruptedException {
System.out.println("id:" + finalId + ",sum:" + sum.get() + ",grpcSum:" + grpcSum);
Assert.assertEquals(sum.get(), grpcSum);
}
channel.shutdown();
clientChannel.shutdown();
log.info("testUnarySum success!");
}

// 用于测试客户端流
@Test
public void testClientStreamSum() throws Throwable {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090)
.usePlaintext()
.build();
log.info("testClientStreamSum start!");

ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(channel);
ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(clientChannel);

AtomicInteger sum = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(1);
Expand All @@ -115,25 +125,24 @@ public void onCompleted() {
}
});

for (int j = 0; j < 1000; j++) {
for (int j = 0; j < 100; j++) {
int num = random.nextInt(1001);
sum.addAndGet(num);
clientStreamObserver.onNext(ArthasUnittest.ArthasUnittestRequest.newBuilder().setNum(num).build());
}

clientStreamObserver.onCompleted();
latch.await();
channel.shutdown();
latch.await(20,TimeUnit.SECONDS);
clientChannel.shutdown();
log.info("testClientStreamSum success!");
}

// 用于测试请求数据隔离性
@Test
public void testDataIsolation() throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090)
.usePlaintext()
.build();
log.info("testDataIsolation start!");

ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(channel);
ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(clientChannel);
for (int i = 0; i < 10; i++) {
threadPool.submit(() -> {
AtomicInteger sum = new AtomicInteger(0);
Expand Down Expand Up @@ -170,23 +179,22 @@ public void onCompleted() {

clientStreamObserver.onCompleted();
try {
latch.await();
latch.await(20,TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
channel.shutdown();
clientChannel.shutdown();
});
}
Thread.sleep(7000L);
Thread.sleep(10000L);
log.info("testDataIsolation success!");
}

@Test
public void testServerStream() throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090)
.usePlaintext()
.build();
log.info("testServerStream start!");

ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(channel);
ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(clientChannel);

ArthasUnittest.ArthasUnittestRequest request = ArthasUnittest.ArthasUnittestRequest.newBuilder().setMessage("serverStream").build();

Expand All @@ -211,18 +219,17 @@ public void onCompleted() {
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
channel.shutdown();
clientChannel.shutdown();
}
log.info("testServerStream success!");
}

// 用于测试双向流
@Test
public void testBiStream() throws Throwable {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090)
.usePlaintext()
.build();
log.info("testBiStream start!");

ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(channel);
ArthasUnittestServiceGrpc.ArthasUnittestServiceStub stub = ArthasUnittestServiceGrpc.newStub(clientChannel);

CountDownLatch latch = new CountDownLatch(1);
StreamObserver<ArthasUnittest.ArthasUnittestRequest> biStreamObserver = stub.biStream(new StreamObserver<ArthasUnittest.ArthasUnittestResponse>() {
Expand Down Expand Up @@ -251,8 +258,9 @@ public void onCompleted() {

Thread.sleep(2000);
biStreamObserver.onCompleted();
latch.await();
channel.shutdown();
latch.await(20, TimeUnit.SECONDS);
clientChannel.shutdown();
log.info("testBiStream success!");
}

private void addSum(ArthasUnittestServiceGrpc.ArthasUnittestServiceBlockingStub stub, int id, int num) {
Expand Down
Loading