Skip to content

Commit

Permalink
4.26 RPC框架-基于nio v.1.0版
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzzzzzzyt committed Apr 26, 2022
1 parent ddcc9be commit 6309d0f
Show file tree
Hide file tree
Showing 4 changed files with 453 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
package consumer.bootstrap;

import consumer.nio.NIOClient;

import java.io.IOException;

/*
以nio为网络编程框架的消费者端启动类
*/
public class NIOConsumerBootStrap {
public static void main(String[] args) throws IOException {
NIOClient.start("127.0.0.1",6666);
}
}
75 changes: 75 additions & 0 deletions zyt-rpc-consumer/src/main/java/consumer/nio/NIOClient.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,79 @@
package consumer.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Scanner;

public class NIOClient {
public static void start(String HostName, int PORT) throws IOException{
start0(HostName,PORT);
}

//真正启动在这
private static void start0(String hostName, int port) throws IOException {
//得到一个网络通道
SocketChannel socketChannel = SocketChannel.open();
System.out.println("-----------服务消费方启动-------------");
socketChannel.configureBlocking(false);
//建立链接 非阻塞连接 但我们是要等他连接上
if (!socketChannel.connect(new InetSocketAddress(hostName,port))) {
while (!socketChannel.finishConnect());
}
//创建选择器 进行监听读事件
Selector selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
//创建匿名线程进行监听读事件
new Thread(new Runnable() {
@Override
public void run() {
while (true)
{
//捕获异常 监听读事件
try {
if (selector.select(1000)==0)
{
continue;
}
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext())
{
SelectionKey key = keyIterator.next();
ByteBuffer buffer = (ByteBuffer)key.attachment();
SocketChannel channel = (SocketChannel)key.channel();
int read = 1;
//用这个的原因是怕 多线程出现影响
StringBuffer stringBuffer = new StringBuffer();
while (read!=0)
{
buffer.clear();
read = channel.read(buffer);
stringBuffer.append(new String(buffer.array(),0,read));
}
System.out.println("收到服务端回信"+stringBuffer.toString());
keyIterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}).start();

//真正的业务逻辑 等待键盘上的输入 进行发送信息
Scanner scanner = new Scanner(System.in);
while (true)
{
int methodNum = scanner.nextInt();
String message = scanner.next();
String msg = new String(methodNum+"#"+message);
socketChannel.write(ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8)));
System.out.println("消息发送");
}
}
}
22 changes: 12 additions & 10 deletions zyt-rpc-provider/src/main/java/provider/nio/NIOServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@
import provider.api.ByeServiceImpl;
import provider.api.HelloServiceImpl;

import javax.sound.sampled.Port;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;

Expand All @@ -23,14 +20,16 @@ public static void start(int PORT) throws IOException {
start0(PORT);
}

//TODO 当服务消费方下机时 保持开启状态

/*
真正启动的业务逻辑在这
因为这是简易版 那么先把异常丢出去
*/
private static void start0(int port) throws IOException {
//创建对应的服务器端通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

System.out.println("-----------服务提供方启动-------------");
//开启一个选择器 将自己要
Selector selector = Selector.open();

Expand Down Expand Up @@ -76,8 +75,8 @@ private static void start0(int port) throws IOException {
//进行调用方法并返回
//获得信息
StringBuffer stringBuffer = new StringBuffer();
int read = 0;
while (read!=-1)
int read = 1;
while (read!=0)
{
//先清空 防止残留
buffer.clear();
Expand All @@ -88,6 +87,7 @@ private static void start0(int port) throws IOException {
//方法号和信息中间有个#进行分割
String msg = stringBuffer.toString();
String[] strings = msg.split("#");
String response;
if (strings.length<2)
{
//当出现传入错误的时候 报异常
Expand All @@ -97,12 +97,12 @@ private static void start0(int port) throws IOException {
if (strings[0].equals("1"))
{
HelloService helloService = new HelloServiceImpl();
helloService.sayHello(strings[1]);
response = helloService.sayHello(strings[1]);
}
else if (stringBuffer.charAt(0)==2)
else if (strings[0].equals("2"))
{
ByeService byeService = new ByeServiceImpl();
byeService.sayBye(strings[1]);
response = byeService.sayBye(strings[1]);
}
else
{
Expand All @@ -112,7 +112,9 @@ else if (stringBuffer.charAt(0)==2)
}
String responseMsg = "收到信息" + strings[1] + "来自" + socketChannel.socket().getRemoteSocketAddress();
System.out.println(responseMsg);
ByteBuffer responseBuffer = ByteBuffer.wrap(responseMsg.getBytes(StandardCharsets.UTF_8));
//将调用方法后获得的信息回显
ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes(StandardCharsets.UTF_8));
//写回信息
socketChannel.write(responseBuffer);
}
keyIterator.remove();
Expand Down
Loading

0 comments on commit 6309d0f

Please sign in to comment.