Skip to content

Commit

Permalink
5.1 v1.5 将负载均衡策略通过注解注入,同时创建了随机负载均衡策略
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzzzzzzyt committed May 3, 2022
1 parent a2b9a22 commit 0c7b51c
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 40 deletions.
2 changes: 2 additions & 0 deletions zyt-rpc-call/src/main/java/service/call/ServerCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@




import annotation.RpcMethodCluster;
import org.apache.zookeeper.KeeperException;
import service.bootstrap.ServerBootStrap;
Expand All @@ -11,6 +12,7 @@
//通用启动类 将启动的逻辑藏在ServerBootStrap中
//注解 看你像启动多少个服务和对应的方法
@RpcMethodCluster(method = {"Hello","Bye"},startNum = {2,3})

public class ServerCall {
public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchMethodException {
ServerBootStrap.start();
Expand Down
13 changes: 13 additions & 0 deletions zyt-rpc-common/src/main/java/annotation/LoadBalanceMethodImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

//注解的参数直接是要传入什么类
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface LoadBalanceMethodImpl {
Class chosenMethod();
}
1 change: 0 additions & 1 deletion zyt-rpc-common/src/main/java/exception/RpcException.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,4 @@
public class RpcException extends Exception{
public RpcException(){super();}
public RpcException(String message){super(message);}

}
47 changes: 47 additions & 0 deletions zyt-rpc-common/src/main/java/loadbalance/AccessBalance.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package loadbalance;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

public class AccessBalance implements LoadBalance{
@Override
public String loadBalance(ZooKeeper zooKeeper, String path) throws InterruptedException, KeeperException {
List<String> children = zooKeeper.getChildren(path, false, null);
if (children.isEmpty())
{
System.out.println("当前没有服务器提供该服务 请联系工作人员");
}
//进行排序 根据每个节点的访问次数 从小到大进行排序 然后选用最小的
Collections.sort(children, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {

try {
return Integer.valueOf(new String(zooKeeper.getData(path+"/"+o1,false,null)))
-
Integer.valueOf(new String(zooKeeper.getData(path+"/"+o2,false,null)));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return 0;
}
});
//对选用的对象的访问量加1 todo 暂时不知道怎么让数据直接+1
// 获取节点数据+1,然后修改对应节点,
String chooseNode = children.get(0);
byte[] data = zooKeeper.getData(path+"/"+chooseNode, false, null);
int visitCount = Integer.valueOf(new String(data));
++visitCount;
//version参数用于指定节点的数据版本,表名本次更新操作是针对指定的数据版本进行的。 cas
zooKeeper.setData(path+"/"+chooseNode,String.valueOf(visitCount).getBytes(StandardCharsets.UTF_8),-1);
String address = new String(children.get(0));
return address;
}
}
13 changes: 13 additions & 0 deletions zyt-rpc-common/src/main/java/loadbalance/LoadBalance.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package loadbalance;


import annotation.LoadBalanceMethodImpl;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;

//实现不同的负载均衡策略
@LoadBalanceMethodImpl(chosenMethod = RandomBalance.class)
public interface LoadBalance {
//通过负载均衡策略返回相应地址
String loadBalance(ZooKeeper zooKeeper, String path) throws InterruptedException, KeeperException;
}
29 changes: 29 additions & 0 deletions zyt-rpc-common/src/main/java/loadbalance/RandomBalance.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package loadbalance;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Random;

public class RandomBalance implements LoadBalance{
@Override
public String loadBalance(ZooKeeper zooKeeper, String path) throws InterruptedException, KeeperException {
List<String> children = zooKeeper.getChildren(path, null,null);
if (children.isEmpty())
{
System.out.println("当前没有服务器提供该服务 请联系工作人员");
}
int size = children.size();
Random random = new Random();
//这是应该处于0——size-1之间
int randomIndex = random.nextInt(size);
String chooseNode = children.get(randomIndex);
byte[] data = zooKeeper.getData(path + "/" + chooseNode, null, null);
int visitedCount = Integer.valueOf(new String(data));
++visitedCount;
zooKeeper.setData(path+"/"+ chooseNode, String.valueOf(visitedCount).getBytes(StandardCharsets.UTF_8),-1);
return chooseNode;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package consumer.zkService;


import annotation.LoadBalanceMethodImpl;
import loadbalance.LoadBalance;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
Expand All @@ -26,4 +28,15 @@ public void connect() throws IOException, InterruptedException, KeeperException
if (zooKeeper.exists("/test", null)==null)zooKeeper.create("/test","".getBytes(StandardCharsets.UTF_8),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zooKeeper.create("/test/hello","12".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}


@Test
public void testInterface()
{
LoadBalanceMethodImpl annotation = LoadBalance.class.getAnnotation(LoadBalanceMethodImpl.class);
Class method = annotation.chosenMethod();

}
}


Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package consumer.zkService;

import annotation.LoadBalanceMethodImpl;
import constants.RpcConstants;
import consumer.nio.NIONonBlockingClient12;
import exception.RpcException;
import loadbalance.LoadBalance;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;


public class ZkServiceDiscovery {
private static String connectString = RpcConstants.ZOOKEEPER_ADDRESS;
Expand All @@ -30,7 +31,7 @@ public void process(WatchedEvent watchedEvent) {
}

// 根据所请求的服务地址 获取对应的远端地址
public static String getMethodAddress(String methodName) throws RpcException, InterruptedException, KeeperException {
public static String getMethodAddress(String methodName) throws RpcException, InterruptedException, KeeperException, InstantiationException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {

//判断节点中是否存在对应路径 不存在则抛出异常
if (zooKeeper.exists("/service/"+methodName,null)==null)
Expand All @@ -40,45 +41,19 @@ public static String getMethodAddress(String methodName) throws RpcException, In
}

String prePath = "/service/"+methodName;
//v1.3更新 使用软负载
//到对应节点中获取下面的子节点
List<String> children = zooKeeper.getChildren(prePath, false, null);
if (children.isEmpty())
{
System.out.println("当前没有服务器提供该服务 请联系工作人员");
}
//v1.5修改使用负载均衡策略 根据接口上注解选择的实现类进行调用
LoadBalanceMethodImpl annotation = LoadBalance.class.getAnnotation(LoadBalanceMethodImpl.class);
Class methodClass = annotation.chosenMethod();
Method method = methodClass.getMethod("loadBalance", new Class[]{ZooKeeper.class, String.class});
//被选中的负载均衡实现类的对象 通过反射执行 获取对应的地址
Object methodChosenClass = methodClass.newInstance();
String address = (String) method.invoke(methodChosenClass,zooKeeper,prePath);

//进行排序 根据每个节点的访问次数 从小到大进行排序 然后选用最小的
Collections.sort(children, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {

try {
return Integer.valueOf(new String(zooKeeper.getData(prePath+"/"+o1,false,null)))
-
Integer.valueOf(new String(zooKeeper.getData(prePath+"/"+o2,false,null)));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return 0;
}
});
//对选用的对象的访问量加1 todo 暂时不知道怎么让数据直接+1
// 获取节点数据+1,然后修改对应节点,
String chooseNode = children.get(0);
byte[] data = zooKeeper.getData(prePath+"/"+chooseNode, false, null);
int visitCount = Integer.valueOf(new String(data));
++visitCount;
//version参数用于指定节点的数据版本,表名本次更新操作是针对指定的数据版本进行的。 cas
zooKeeper.setData(prePath+"/"+chooseNode,String.valueOf(visitCount).getBytes(StandardCharsets.UTF_8),-1);
String address = new String(children.get(0));
return address;
}


public static String getStart(String methodName,String msg) throws IOException, RpcException, InterruptedException, KeeperException {
public static String getStart(String methodName,String msg) throws IOException, RpcException, InterruptedException, KeeperException, InvocationTargetException, InstantiationException, IllegalAccessException, NoSuchMethodException {
//先进行连接
getConnect();
//获取相应的远端地址
Expand Down

0 comments on commit 0c7b51c

Please sign in to comment.