Skip to content

Commit

Permalink
5.6 v2.2 实现了对象的传输 用到了netty自带的编解码器 遇到了些bug都是自己粗心导致的 明天继续!
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzzzzzzyt committed May 6, 2022
1 parent 57af6d3 commit bf6be29
Show file tree
Hide file tree
Showing 22 changed files with 509 additions and 5 deletions.
18 changes: 16 additions & 2 deletions zyt-rpc-call/src/main/java/service/ClientCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,35 @@
import annotation.RpcToolsSelector;
import exception.RpcException;
import method.Customer;
import serialization.entity.Person;
import service.call.ChosenClientCall;

import java.io.IOException;

//总客户端启动类 用户调用 什么版本的 和用什么工具 使用什么注册中心 序列化的选择 都可以用这个来玩
//注册中心不能给过去 这样就是重复依赖了
@RpcClientBootStrap(version = "2.1")
@RpcClientBootStrap(version = "2.2")
@RpcToolsSelector(rpcTool = "Netty")
public class ClientCall {
public static void main(String[] args) throws RpcException, IOException, InterruptedException {
//实现调用
Customer customer = ChosenClientCall.start();

// long start = System.currentTimeMillis();

Person person = new Person("zz");
System.out.println(customer.GetPerson(person));
System.out.println(customer.GetName(new Person("祝英台")));
System.out.println(customer.GetName(new Person("祝英台")));
System.out.println(customer.GetName(new Person("祝英台")));
System.out.println(customer.Hello("success"));
System.out.println(customer.Bye("fail"));
System.out.println(customer.Hello("fail"));
System.out.println(customer.Bye("fail"));
System.out.println(customer.Bye("fail"));


// long end = System.currentTimeMillis();
// System.out.println(end-start);

}
}
5 changes: 3 additions & 2 deletions zyt-rpc-call/src/main/java/service/ServerCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@

//总服务端启动类 用户调用 注解是 注册什么方法进去
//调用的是什么版本的服务端启动方法
@RpcMethodCluster(method = {"Hello","Bye"},startNum = {2,3})
@RpcServerBootStrap(version = "2.1")
//方法的注册名必须和对应的方法一一对应
@RpcMethodCluster(method = {"Hello","Bye","GetName","GetPerson"},startNum = {3,3,3,1})
@RpcServerBootStrap(version = "2.2")
@RpcToolsSelector(rpcTool = "Netty")
public class ServerCall {
public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchMethodException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import annotation.RpcClientBootStrap;
import consumer.bootstrap.netty.NettyConsumerBootStrap20;
import consumer.bootstrap.netty.NettyConsumerBootStrap21;
import consumer.bootstrap.netty.NettyConsumerBootStrap22;
import exception.RpcException;
import method.Customer;
import service.ClientCall;
Expand All @@ -25,6 +26,8 @@ private static Customer start0() throws InterruptedException, RpcException {
return null;
case "2.1":
return NettyConsumerBootStrap21.main(null);
case "2.2":
return NettyConsumerBootStrap22.main(null);
default:
System.out.println("该版本还没出呢,你如果有想法可以私信我,或者提个pr");
throw new RpcException("出现问题");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.apache.zookeeper.KeeperException;
import provider.bootstrap.netty.NettyProviderBootStrap20;
import provider.bootstrap.netty.NettyProviderBootStrap21;
import provider.bootstrap.netty.NettyProviderBootStrap22;
import service.ServerCall;

import java.io.IOException;
Expand Down Expand Up @@ -49,6 +50,9 @@ public static void start() throws InterruptedException, IOException, KeeperExcep
case "2.1":
NettyProviderBootStrap21.main(new String[]{methodBuilder.toString(),numBuilder.toString()});
break;
case "2.2": //沿用 就是 做个区分 这个版本时进行序列化的测试
NettyProviderBootStrap22.main(new String[]{methodBuilder.toString(),numBuilder.toString()});
break;
default:
System.out.println("兄弟,该版本还在脑海中构思,如果你有想法可以pr给我");
}
Expand Down
13 changes: 13 additions & 0 deletions zyt-rpc-common/src/main/java/annotation/CodecSelector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

//根据该注解判断选择当遇到传对象的相应的方法时,采用什么编解码方式
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface CodecSelector {
String Codec() default "ObjectCodec";
}
92 changes: 92 additions & 0 deletions zyt-rpc-common/src/main/java/codec/AddCodec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package codec;

import annotation.CodecSelector;
import exception.RpcException;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import serialization.Serialization;

import java.lang.reflect.Method;

//公共类 根据对应选择的注解进行编码器的添加
public class AddCodec {

/**
*
* @param pipeline
* @param method 方法,来获取对应的输入输出类
* @throws RpcException
*/
public static void addCodec(ChannelPipeline pipeline, Method method,boolean isConsumer) throws RpcException {
//根据注解进行编解码器的选择
CodecSelector annotation = Serialization.class.getAnnotation(CodecSelector.class);

//目前而来我的传输 传入的参数都是一个 所以根据这一个传入和返回的参数的类型进行判断
//下面是我传入的参数 和传出的参数
Class<?> returnType = method.getReturnType();
Class<?> parameterType = method.getParameterTypes()[0];

String codec = annotation.Codec();
switch (codec)
{
case "ObjectCodec":
if (returnType!=String.class&&parameterType!=String.class)
{
pipeline.addLast(new ObjectEncoder());
//传的参是固定写法
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.weakCachingResolver(null)));
}
else if (returnType!=String.class&&parameterType==String.class)
{
//如果是客户端的话那么传出的是服务端传入的 所以这边编码那边就是解码
if (isConsumer)
{
//根据传入传出进行对应的编码
pipeline.addLast(new StringEncoder());
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.weakCachingResolver(null)));
}
else
{
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new StringDecoder());
}

}
else if (returnType==String.class&&parameterType!=String.class)
{
//客户端 会对参数进行编码,服务端是解码
if (isConsumer)
{
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new StringDecoder());
}
else
{
pipeline.addLast(new StringEncoder());
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.weakCachingResolver(null)));
}
}
else
{
//因为传入参数和传出都是字符串类型 所以就传入字符串编解码器
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
}
return;
case "protobuf": //添加protobuf的编解码器
return;
case "kryo": //添加kryo的编解码器
return;
default: //如果都不是那就不加了
return;
}
}
}
5 changes: 5 additions & 0 deletions zyt-rpc-common/src/main/java/method/Customer.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package method;

import serialization.entity.Person;

//这是之后要被代理的对象 我们会实现它的方法
public interface Customer {
String Hello(String msg);
String Bye(String msg);
//加个简单的先试试 传送person 获取他的姓名
String GetName(Person person);
Person GetPerson(Person person);
}
7 changes: 7 additions & 0 deletions zyt-rpc-common/src/main/java/method/GetNameService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package method;

import serialization.entity.Person;

public interface GetNameService {
String sayGetName(Person person);
}
7 changes: 7 additions & 0 deletions zyt-rpc-common/src/main/java/method/GetPersonService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package method;

import serialization.entity.Person;

public interface GetPersonService {
Person sayGetPerson(Person person);
}
8 changes: 8 additions & 0 deletions zyt-rpc-common/src/main/java/serialization/Serialization.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package serialization;

import annotation.CodecSelector;

//直接选择默认方法
@CodecSelector()
public interface Serialization {
}
28 changes: 28 additions & 0 deletions zyt-rpc-common/src/main/java/serialization/entity/Person.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package serialization.entity;




import serialization.Serialization;

import java.io.Serializable;

//这是普通进行序列化传递需要
//千万注意要实现序列化接口

//接口一定要实现正确
public class Person implements Serializable {
private String name;

//构造方法
public Person(String name)
{
this.name = name;
}

public String getName()
{
return name;
}

}
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package consumer.bootstrap.netty;


import consumer.proxy.RpcNettyClientProxy;
import method.Customer;

/*
以netty为网络编程框架的消费者端启动类
*/
//进行启动 提供类的方式即可
public class NettyConsumerBootStrap22 {
public static Customer main(String[] args) throws InterruptedException {
return (Customer) RpcNettyClientProxy.getBean(Customer.class);
}
}
10 changes: 10 additions & 0 deletions zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package consumer.netty;

import java.lang.reflect.Method;

//如果对应的是字符串类那就不用到下面的这个里面调用了
public class NettyClient {
public static Object callMethod(String hostName, int port, Object param, Method method) throws Exception {
return NettyClient22.callMethod(hostName, port, param,method);
}
}
70 changes: 70 additions & 0 deletions zyt-rpc-consumer/src/main/java/consumer/netty/NettyClient22.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package consumer.netty;


import codec.AddCodec;
import consumer.netty_client_handler.NettyClientHandler22;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import serialization.entity.Person;

import java.lang.reflect.Method;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

//实际客户端启动类 进行操作
//不确定能返回什么 所以判断是对象
public class NettyClient22 {

//线程池 实现异步调用
private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

static NettyClientHandler22 clientHandler;

public static void initClient(String hostName,int port,Method method)
{

clientHandler = new NettyClientHandler22();
//建立客户端监听
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup workGroup = new NioEventLoopGroup();

try {
bootstrap.group(workGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();

//加编解码器的逻辑,根据对应的注解进行编码器的添加 这里面有实现对应的逻辑 //
AddCodec.addCodec(pipeline,method,true);
pipeline.addLast(clientHandler);
}
});

//进行连接
bootstrap.connect(hostName, port).sync();

} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static Object callMethod(String hostName, int port, Object param, Method method) throws Exception {

//我是有多个地方进行调用的 不能只连接一个
initClient(hostName,port,method);
clientHandler.setParam(param);
//接下来这就有关系到调用 直接调用
return executor.submit(clientHandler).get();
}
}
Loading

0 comments on commit bf6be29

Please sign in to comment.