diff --git "a/sig/rpc/talk/blog/cdubbo\347\232\204\350\256\276\350\256\241\346\200\235\350\267\257.md" "b/sig/rpc/talk/blog/cdubbo\347\232\204\350\256\276\350\256\241\346\200\235\350\267\257.md" new file mode 100644 index 0000000..4bd2bc6 --- /dev/null +++ "b/sig/rpc/talk/blog/cdubbo\347\232\204\350\256\276\350\256\241\346\200\235\350\267\257.md" @@ -0,0 +1,33 @@ +# cdubbo的设计思路 +## dubbo设计 +![dubbo-framework](pic/dubbo-framework.jpg) + +## cdubbo设计 +### 嵌入dubbo的思路 +通过实现dubbo中不同层次的接口,将cdubbo的实现类嵌入到dubbo流程中。 +同时cdubbo的实现类持有自己特有的资源(初始化流程中的资源),在调用流程中使用自己持有的资源做特定逻辑 + +### 层次设计思路 +#### 动静结合 +> (与Tomcat的mapper层设计思路相似) +cdubbo通过实现类将自身伪装成invoker等,注入到dubbo的调用流程中,但这些实现类并不直接持有资源。 +资源的持有交给同层级的Manager,通过静态域管理该层次的资源。 +在实现类执行动作的时候,将会从Manager静态资源管理器中,获取资源。 + +#### 资源插件化 +Manager静态资源管理器,使用插件化的方式,在初始化的时候可以装配不同的插件,从而加载不同的资源 + +#### cdubbo组件 +##### Filter +Hystrix:断路器功能 + +##### Registry +Artemis:注册中心 + +##### Router +Artemis:注册中心 + +##### Other +接入堡垒平台,用作堡垒测试 +接入Cat,进行异常的打点 +接入SOA,通过http直接访问soa服务 \ No newline at end of file diff --git "a/sig/rpc/talk/blog/dubbo\347\232\204invoke\346\265\201\347\250\213.md" "b/sig/rpc/talk/blog/dubbo\347\232\204invoke\346\265\201\347\250\213.md" new file mode 100644 index 0000000..fb82562 --- /dev/null +++ "b/sig/rpc/talk/blog/dubbo\347\232\204invoke\346\265\201\347\250\213.md" @@ -0,0 +1,319 @@ +# dubbo的invoke流程 +## 概要 +梳理Dubbo的调用链路流程,分析其设计理念和思路,以及阅读其中关键步骤的源码逻辑 + +## Review +> 在 RPC 中,Protocol 是核心层,也就是只要有 Protocol + Invoker + Exporter 就可以完成非透明的 RPC 调用,然后在 Invoker 的主过程上设置Filter 拦截点。 + +调用流程主要围绕Protocol/Invoker/Filter三个接口进行 +```java +public interface Protocol { + + Exporter export(Invoker invoker) throws RpcException; + + Invoker refer(Class type, URL url) throws RpcException; + + void destroy(); +} + +``` +```java +public interface Invoker extends Node { + + Result invoke(Invocation invocation) throws RpcException; + + void destroy(); +} + +``` +```java +public interface Filter extends BaseFilter { + + Result invoke(Invoker invoker, Invocation invocation) throws RpcException; +} + +``` + +## 整体流程 +![dubbo-framework](pic/dubbo-framework.jpg) + +### 初始化流程 +1. 初始化配置相关 + +2. 初始化Protocol + +3. 初始化Invoker + +4. 初始化底层资源 + + +### 调用流程 +1. 调用Invoker,做一些逻辑 +2. 调用Filter,做一些逻辑 +3. 调用Invoker,做一些逻辑 +4. 调用底层资源 + +## 简化设计 +从一个Client开始...... + +### 1. 直接调用 +![invoker1](pic/invoker1.png) + +### 2. 责任链模式 +解决函数逻辑的水平拓展问题 +![invoker2](pic/invoker2.png) + +定义接口:Response invoke(Request request); +```java +public interface Invoker extends Node { + + Result invoke(Invocation invocation) throws RpcException; + + void destroy(); +} +``` + +### 3. 过滤器插件 +![invoker3](pic/invoker3.png) + +### 在重要的过程上设置拦截接口 +如果你要写个远程调用框架,那远程调用的过程应该有一个统一的拦截接口。如果你要写一个 ORM 框架,那至少 SQL 的执行过程,Mapping 过程要有拦截接口;如果你要写一个 Web 框架,那请求的执行过程应该要有拦截接口,等等。没有哪个公用的框架可以 Cover 住所有需求,允许外置行为,是框架的基本扩展方式。这样,如果有人想在远程调用前,验证下令牌,验证下黑白名单,统计下日志;如果有人想在 SQL 执行前加下分页包装,做下数据权限控制,统计下 SQL 执行时间;如果有人想在请求执行前检查下角色,包装下输入输出流,统计下请求量,等等,就可以自行完成,而不用侵入框架内部。拦截接口,通常是把过程本身用一个对象封装起来,传给拦截器链,比如:远程调用主过程为 invoke(),那拦截器接口通常为 invoke(Invocation),Invocation 对象封装了本来要执行过程的上下文,并且 Invocation 里有一个 invoke() 方法,由拦截器决定什么时候执行,同时,Invocation 也代表拦截器行为本身,这样上一拦截器的 Invocation 其实是包装的下一拦截器的过程,直到最后一个拦截器的 Invocation 是包装的最终的 invoke() 过程;同理,SQL 主过程为 execute(),那拦截器接口通常为 execute(Execution),原理一样。当然,实现方式可以任意,上面只是举例。 +```java +public interface Filter extends BaseFilter { + + Result invoke(Invoker invoker, Invocation invocation) throws RpcException; +} +``` +装饰模式/组合模式 +![invoker4](pic/invoker4.png) + +### 4. 领域模型的设计 +**重资源 -> 资源的管理 -> 生命周期** + +资源的管理: +共享、创建、释放、生命周期........ +链路治理、上下文、组装........ + +### 领域模型 +![tomcat](pic/tomcat.png) +在 Dubbo 的核心领域模型中: + +* Protocol 是服务域,它是 Invoker 暴露和引用的主功能入口,它负责 Invoker 的生命周期管理。 +* Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。 +* Invocation 是会话域,它持有调用过程中的变量,比如方法名,参数等。 + +```java +public interface Protocol { + + Exporter export(Invoker invoker) throws RpcException; + + Invoker refer(Class type, URL url) throws RpcException; + + void destroy(); +} +``` +![invoker5](pic/invoker5.png) +共享资源谁持有? +缓存在哪里存储? +线程安全? +单例还是New? + +### 服务域/实体域/会话域分离 + +任何框架或组件,总会有核心领域模型,比如:Spring 的 Bean,Struts 的 Action,Dubbo 的 Service,Napoli 的 Queue 等等。这个核心领域模型及其组成部分称为实体域,它代表着我们要操作的目标本身。实体域通常是线程安全的,不管是通过不变类,同步状态,或复制的方式。① + +服务域也就是行为域,它是组件的功能集,同时也负责实体域和会话域的生命周期管理, 比如 Spring 的 ApplicationContext,Dubbo 的 ServiceManager 等。服务域的对象通常会比较重,而且是线程安全的,并以单一实例服务于所有调用。② + +什么是会话?就是一次交互过程。会话中重要的概念是上下文,什么是上下文?比如我们说:“老地方见”,这里的“老地方”就是上下文信息。为什么说“老地方”对方会知道,因为我们前面定义了“老地方”的具体内容。所以说,上下文通常持有交互过程中的状态变量等。会话对象通常较轻,每次请求都重新创建实例,请求结束后销毁。简而言之:把元信息交由实体域持有③,把一次请求中的临时状态由会话域持有,由服务域贯穿整个过程。 + +> ① Invoker仅维护自己的状态,类似Actor设计模式 + ② Protocol初始化为单例,并且持有缓存Map + ③ Invoker中存储默认元信息,临时信息存储到Invocation + +### 4.1 资源的销毁 +由服务域管理资源的生命周期,并委托给实体域执行 + +```java +public interface Protocol { + + Exporter export(Invoker invoker) throws RpcException; + + Invoker refer(Class type, URL url) throws RpcException; + + void destroy(); +} +``` +```java +public interface Invoker extends Node { + + Result invoke(Invocation invocation) throws RpcException; + + void destroy(); +} +``` + +### 4.2 状态的监听 +#### 重要的状态的变更发送事件并留出监听接口 +这里先要讲一个事件和上面拦截器的区别,拦截器是干预过程的,它是过程的一部分,是基于过程行为的,而事件是基于状态数据的,任何行为改变的相同状态,对事件应该是一致的。事件通常是事后通知,是一个 Callback 接口,方法名通常是过去式的,比如 onChanged()。比如远程调用框架,当网络断开或连上应该发出一个事件,当出现错误也可以考虑发出一个事件,这样外围应用就有可能观察到框架内部的变化,做相应适应。 +```java +public interface InvokerListener { + + void referred(Invoker invoker) throws RpcException; + + void destroyed(Invoker invoker); +} +``` + +### 4.3 纵向拓展 +SPI,微内核,插件化 + +#### 微核插件式,平等对待第三方 +大凡发展的比较好的框架,都遵守微核的理念。Eclipse 的微核是 OSGi, Spring 的微核是 BeanFactory,Maven 的微核是 Plexus。通常核心是不应该带有功能性的,而是一个生命周期和集成容器,这样各功能可以通过相同的方式交互及扩展,并且任何功能都可以被替换。如果做不到微核,至少要平等对待第三方,即原作者能实现的功能,扩展者应该可以通过扩展的方式全部做到。原作者要把自己也当作扩展者,这样才能保证框架的可持续性及由内向外的稳定性。 +引入配置模块,担当微内核。并负责Protocol的初始化启动(Protocol再负责下层的初始化) + +### 4.4 完善的分层 +* config 配置层:对外配置接口,以 ServiceConfig, ReferenceConfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类 + +* proxy 服务代理层:服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,扩展接口为 ProxyFactory + +* registry 注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService + +* cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker为中心,扩展接口为 Cluster, Directory, Router, LoadBalance + +* monitor 监控层:RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为 MonitorFactory, Monitor, MonitorService + +* protocol 远程调用层:封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter + +* exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer + +* transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec + +* serialize 数据序列化层:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool + +其他层次以插件的方式装配到config里,核心仍然是Protocol层。功能要么伪装成Invoker,要么在初始化时进行。 + +> 在 RPC 中,Protocol 是核心层,也就是只要有 Protocol + Invoker + Exporter 就可以完成非透明的 RPC 调用,然后在 Invoker 的主过程上 Filter 拦截点。 + +![dubbo-framework](pic/dubbo-framework.jpg) + +![image](pic/image.png) + +层级关系: +``` +-system +-config +-proxy +-registry +-cluster +-monitor +-protocol + -invoker + -exchange + -... +``` + +![invoker](pic/invoker.png) + +### 5. 异步调用 +```java +public interface Result extends Serializable { + + Result whenCompleteWithContext(BiConsumer fn); +} +``` + +### 重点关注点 +结合一开始的大概流程... + +#### protocol初始化流程 +初始化的时候做了什么?集群?注册发现?代理封装? + +资源如何加载?如何缓存?连接池?线程池? + +........ + +#### 2. invoke流程 +实际调用过程的流程?有哪些逻辑步骤? + +负载均衡?重试?监控?序列化? + +同步异步?流式调用? + +........ + +### Invoke流程解读 +![dubbo-extension](pic/dubbo-extension.jpg) + +### 调用流程 +![triple-invoke调用](pic/triple-invoke调用.png) + +### 源码分析 +关键Invoker的逻辑: + +AbstractInvoker: 3个步骤,(PR:https://github.com/apache/dubbo/pull/7952) + +具体逻辑在doInvoke中 + +##### FailoverClusterInvoker +当调用失败时,记录初始错误并重试其他调用程序(重试n次,这意味着最多将调用n个不同的调用程序)注意,重试会导致延迟。 故障转移 + +图中方法:list、route、select + +什么时候会重试?非biz异常,但粒度很粗....(Issue相关) + +##### ListenerInvokerWrapper +注册invoker的listener,并进行操作,装饰模式 + +改进:观察者模式+生命周期 + +##### FilterChainNode +实际上是Invoker,但内部保存了Filter + +Invoker装饰,Filter适配,两者的组合 + +##### DubboInvoker +可以看到:Invoker层持有下层的资源管理 + +接下来就是excahnge层的逻辑 + +##### TripleInvoker +直接持有底层netty资源,比较粗糙 + +### 总结 +### 1.层次结构 +层级关系: +``` +-system +-config +-proxy +-registry +-cluster +-monitor +-protocol + -invoker + -exchange + -... +``` + +### 2.调用流程 +![invoker](pic/invoker.png) + +### 3.设计模式 +责任链:Invoker + +适配器:适配到Invoker责任链中 + +装饰:对Invoker进行增强 + +......... + +### 4.设计优化 +Invoker调用链:整个系统的核心,插件化功能采用拓展接口的方式,各个模块功能比较明确 + +Lifecycle生命周期:不完全,没有完整的生命周期;对于生命周期的事件处理比较硬编码 + +Listener:装饰的比较硬编码,事件阶段不够全,拓展性不够好 + +Filter:采用装饰模型而非组合模型,拓展性较差;嵌套层级深,调试时链路不清晰 + +........ diff --git "a/sig/rpc/talk/blog/dubbo\347\232\204\345\220\257\345\212\250\346\265\201\347\250\213.md" "b/sig/rpc/talk/blog/dubbo\347\232\204\345\220\257\345\212\250\346\265\201\347\250\213.md" new file mode 100644 index 0000000..2ef72ef --- /dev/null +++ "b/sig/rpc/talk/blog/dubbo\347\232\204\345\220\257\345\212\250\346\265\201\347\250\213.md" @@ -0,0 +1,423 @@ +# dubbo的启动流程 +> 最近有人在Dubbo社区中反馈Dubbo的启动耗时特别长,消费端订阅了265个Dubbo服务,花了近4分钟。为了解决这个问题,对整个Dubbo的启动流程进行了梳理。 + +从整体上来看Dubbo的启动流程一共分为两个部分,见下图: +![Dubbo启动流程概览](pic/Dubbo启动流程概览.png) +本文将重点从以上两个部分来讲述Dubbo的整个启动流程。 +## 启动入口 +Dubbo常用的启动入口分为3种方式,分别是XML方式、API方式和Annotation方式。 ++ XML方式 + + 为了更好的讲清楚这部分的内容,将其细分为3部分: + * Spring与Dubbo基于XML的初始化流程 + + XML方式在设计之初就已经和Spring的整个Bean生命周期进行了整合,我们可以通过对照Spring中Bean的生命周期来看看Dubbo是如何嵌入进来的 + ![Dubbo_XML初始化流程](pic/Dubbo_XML初始化流程.png) + + * Dubbo中XML初始化实现细节 + 具体聚焦到Dubbo的内部实现,通过调用关系可以更加详细的了解到具体的实现细节 + ``` java + org.apache.dubbo.config.spring.schema.DubboNamespaceHandler.parse() + ├─ org.springframework.beans.factory.xml.NamespaceHandlerSupport.parse() + │ └─ org.apache.dubbo.config.spring.schema.DubboBeanDefinitionParser.parse() + │ └─ org.apache.dubbo.config.spring.schema.DubboBeanDefinitionParser.parse() + └─ org.apache.dubbo.config.spring.beans.factory.config.ConfigurableSourceBeanMetadataElement.setSource() + + ``` + + * Dubbo中部分代码片段 + - DubboNamespaceHandler + ```java + public void init() { + registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true)); + registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true)); + registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true)); + registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true)); + registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true)); + registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true)); + registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class, true)); + registerBeanDefinitionParser("ssl", new DubboBeanDefinitionParser(SslConfig.class, true)); + registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true)); + registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true)); + registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true)); + registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)); + registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, true)); + registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser()); + } + ``` + + - DubboBeanUtils + ```java + public static void registerCommonBeans(BeanDefinitionRegistry registry) { + + // Since 2.5.7 Register @Reference Annotation Bean Processor as an infrastructure Bean + registerInfrastructureBean(registry, ReferenceAnnotationBeanPostProcessor.BEAN_NAME, + ReferenceAnnotationBeanPostProcessor.class); + + // Since 2.7.4 [Feature] https://github.com/apache/dubbo/issues/5093 + registerInfrastructureBean(registry, DubboConfigAliasPostProcessor.BEAN_NAME, + DubboConfigAliasPostProcessor.class); + + // Since 2.7.9 Register DubboApplicationListenerRegister as an infrastructure Bean + // https://github.com/apache/dubbo/issues/6559 + + // Since 2.7.5 Register DubboLifecycleComponentApplicationListener as an infrastructure Bean + // registerInfrastructureBean(registry, DubboLifecycleComponentApplicationListener.BEAN_NAME, + // DubboLifecycleComponentApplicationListener.class); + + // Since 2.7.4 Register DubboBootstrapApplicationListener as an infrastructure Bean + // registerInfrastructureBean(registry, DubboBootstrapApplicationListener.BEAN_NAME, + // DubboBootstrapApplicationListener.class); + + registerInfrastructureBean(registry, DubboApplicationListenerRegistrar.BEAN_NAME, + DubboApplicationListenerRegistrar.class); + + // Since 2.7.6 Register DubboConfigDefaultPropertyValueBeanPostProcessor as an infrastructure Bean + registerInfrastructureBean(registry, DubboConfigDefaultPropertyValueBeanPostProcessor.BEAN_NAME, + DubboConfigDefaultPropertyValueBeanPostProcessor.class); + + // Since 2.7.9 Register DubboConfigEarlyInitializationPostProcessor as an infrastructure Bean + registerInfrastructureBean(registry, DubboConfigEarlyInitializationPostProcessor.BEAN_NAME, + DubboConfigEarlyInitializationPostProcessor.class); + } + + ``` + ++ API方式 + API方式是最简单的,我们可以参考Dubbo中的demo,具体如下: + + * Provider + ```java + ServiceConfig service = new ServiceConfig<>(); + service.setInterface(DemoService.class); + service.setRef(new DemoServiceImpl()); + // 创建DubboBootstrap实例 + DubboBootstrap bootstrap = DubboBootstrap.getInstance(); + bootstrap.application(new ApplicationConfig("dubbo-demo-api-provider")) + .registry(new RegistryConfig("zookeeper://127.0.0.1:2181")) + .service(service) + .start()//调用start方法启动 + .await(); + ``` + + * Consumer + ```java + ReferenceConfig reference = new ReferenceConfig<>(); + reference.setInterface(DemoService.class); + reference.setGeneric("true"); + // 创建DubboBootstrap实例 + DubboBootstrap bootstrap = DubboBootstrap.getInstance(); + bootstrap.application(new ApplicationConfig("dubbo-demo-api-consumer")) + .registry(new RegistryConfig("zookeeper://127.0.0.1:2181")) + .reference(reference) + .start(); + + DemoService demoService = ReferenceConfigCache.getCache().get(reference); + String message = demoService.sayHello("dubbo"); + System.out.println(message); + + // generic invoke + GenericService genericService = (GenericService) demoService; + Object genericInvokeResult = genericService.$invoke("sayHello", new String[] { String.class.getName() }, + new Object[] { "dubbo generic invoke" }); + System.out.println(genericInvokeResult); + ``` + ++ Annotation方式 + 目前使用@DubboService或者@DubboReference来发布服务或者订阅服务是最常用的使用方式,具体通过以下3个方面来讲解: + + * Spring与Dubbo基于Annotation的初始化流程 + ![Dubbo_Annotation初始化流程](pic/Dubbo_Annotation初始化流程.png) + + * Dubbo中Annotation初始化实现细节 + - Provider + ``` + org.apache.dubbo.config.spring.beans.factory.annotation.ServiceClassPostProcessor.postProcessBeanDefinitionRegistry() + ├─ org.apache.dubbo.config.spring.beans.factory.annotation.ServiceClassPostProcessor.resolvePackagesToScan() + └─ org.apache.dubbo.config.spring.beans.factory.annotation.ServiceClassPostProcessor.registerServiceBeans() + ├─ org.apache.dubbo.config.spring.beans.factory.annotation.ServiceClassPostProcessor.findServiceBeanDefinitionHolders() + └─ org.apache.dubbo.config.spring.beans.factory.annotation.ServiceClassPostProcessor.registerServiceBean() + + ``` + + - Consumer + ``` + org.apache.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor.doGetInjectedBean() + ├─ org.apache.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor.buildReferencedBeanName()【传统命名】 + ├─ org.apache.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor.getReferenceBeanName()【Bean命名】 + ├─ org.apache.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor.buildReferenceBeanIfAbsent() + ├─ org.apache.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor.prepareReferenceBean() + ├─ org.apache.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor.registerReferenceBean() + ├─ org.apache.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor.cacheInjectedReferenceBean() + └─ org.apache.dubbo.config.ReferenceConfig.get() + └─ org.apache.dubbo.config.ReferenceConfig.init + └─ org.apache.dubbo.config.bootstrap.DubboBootstrap.getInstance + └─ NEW org.apache.dubbo.config.bootstrap.DubboBootstrap() + + ``` + + + * Dubbo中部分代码片段 + - ServiceClassPostProcessor + ```java + private static final List> serviceAnnotationTypes = asList( + // @since 2.7.7 Add the @DubboService , the issue : https://github.com/apache/dubbo/issues/6007 + DubboService.class, + // @since 2.7.0 the substitute @com.alibaba.dubbo.config.annotation.Service + Service.class, + // @since 2.7.3 Add the compatibility for legacy Dubbo's @Service , the issue : https://github.com/apache/dubbo/issues/4330 + com.alibaba.dubbo.config.annotation.Service.class + ); + + ``` + ```java + public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException { + + // @since 2.7.5 + registerInfrastructureBean(registry, DubboBootstrapApplicationListener.BEAN_NAME, DubboBootstrapApplicationListener.class);// 注册监听事件 + // 处理dubbo注解中使用到的placeholder + Set resolvedPackagesToScan = resolvePackagesToScan(packagesToScan); + + if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) { + registerServiceBeans(resolvedPackagesToScan, registry); + } else { + if (logger.isWarnEnabled()) { + logger.warn("packagesToScan is empty , ServiceBean registry will be ignored!"); + } + } + + } + + ``` + ```java + private void registerServiceBeans(Set packagesToScan, BeanDefinitionRegistry registry) { + + DubboClassPathBeanDefinitionScanner scanner = + new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader); + + BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry); + + scanner.setBeanNameGenerator(beanNameGenerator); + + // refactor @since 2.7.7 + serviceAnnotationTypes.forEach(annotationType -> { + scanner.addIncludeFilter(new AnnotationTypeFilter(annotationType)); + }); + + for (String packageToScan : packagesToScan) { + + // Registers @Service Bean first + scanner.scan(packageToScan); + + // Finds all BeanDefinitionHolders of @Service whether @ComponentScan scans or not. + // 扫描所有拥有@Service、@DubboService的注解 + Set beanDefinitionHolders = + findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator); + ... + + } + + } + + ``` + + - ReferenceAnnotationBeanPostProcessor + ```java + public ReferenceAnnotationBeanPostProcessor() { + super(DubboReference.class, Reference.class, com.alibaba.dubbo.config.annotation.Reference.class); + } + + ``` + ```java + protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class injectedType, + InjectionMetadata.InjectedElement injectedElement) throws Exception { + /** + * The name of bean that annotated Dubbo's {@link Service @Service} in local Spring {@link ApplicationContext} + */ + String referencedBeanName = buildReferencedBeanName(attributes, injectedType); + + /** + * The name of bean that is declared by {@link Reference @Reference} annotation injection + */ + String referenceBeanName = getReferenceBeanName(attributes, injectedType); + + referencedBeanNameIdx.computeIfAbsent(referencedBeanName, k -> new TreeSet()).add(referenceBeanName); + + ReferenceBean referenceBean = buildReferenceBeanIfAbsent(referenceBeanName, attributes, injectedType); + + boolean localServiceBean = isLocalServiceBean(referencedBeanName, referenceBean, attributes); + + prepareReferenceBean(referencedBeanName, referenceBean, localServiceBean); + + registerReferenceBean(referencedBeanName, referenceBean, localServiceBean, referenceBeanName); + + cacheInjectedReferenceBean(referenceBean, injectedElement); + + return getBeanFactory().applyBeanPostProcessorsAfterInitialization(referenceBean.get(), referenceBeanName); + } + ``` + +## 核心启动流程 + +不论是采用哪种启动方式,最后都会进入Dubbo的核心启动流程。核心启动流程一共分为2类,分别是Service核心启动流程和Reference核心启动流程。 + ++ Service核心启动流程 + - 代码参考[DubboBootstrap#start](https://github.com/apache/dubbo/blob/master/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java#L877) + + - [启动流程](https://www.processon.com/view/link/60b51102f346fb669dd89c6c#map) + + ![Service核心启动流程](pic/Service核心启动流程.png) + ++ Reference核心启动流程 + - 代码参考[ReferenceConfig#init](https://github.com/apache/dubbo/blob/master/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java#L232) + - [启动流程](https://www.processon.com/view/link/60b51da8e0b34d6d5701489a) + + ![Reference核心启动流程](pic/Reference核心启动流程.png) + + 实现细节 + ``` + └─ org.apache.dubbo.config.spring.context.DubboApplicationListenerRegistrar.createDubboLifecycleComponentApplicationListener() + + /* DubboBootstrap主流程 */ + com.alibaba.spring.context.OnceApplicationContextEventListener.onApplicationEvent() + └─ org.apache.dubbo.config.spring.context.DubboBootstrapApplicationListener.onApplicationContextEvent() + └─ org.apache.dubbo.config.spring.context.DubboBootstrapApplicationListener.onContextRefreshedEvent() + └─ org.apache.dubbo.config.bootstrap.DubboBootstrap.start() + │ + ├─ org.apache.dubbo.config.bootstrap.DubboBootstrap.initialize() + │ ├─ org.apache.dubbo.rpc.model.ApplicationModel.initFrameworkExts() + │ │ └─ org.apache.dubbo.common.config.Environment.initialize() + │ ├─ org.apache.dubbo.config.bootstrap.DubboBootstrap.startConfigCenter()【★启动配置中心】 + │ │ ├─ org.apache.dubbo.config.bootstrap.DubboBootstrap.prepareEnvironment() + │ │ │ ├─ org.apache.dubbo.common.config.configcenter.DynamicConfiguration.getDynamicConfiguration() + │ │ │ │ └─ org.apache.dubbo.common.config.configcenter.AbstractDynamicConfigurationFactory.getDynamicConfiguration() + │ │ │ │ └─ org.apache.dubbo.common.config.configcenter.AbstractDynamicConfigurationFactory.createDynamicConfiguration() + │ │ │ │ └─ NEW org.apache.dubbo.configcenter.support.zookeeper.ZookeeperDynamicConfiguration() + │ │ │ ├─ org.apache.dubbo.common.config.Environment.setConfigCenterFirst() + │ │ │ ├─ org.apache.dubbo.common.config.Environment.updateExternalConfigurationMap() + │ │ │ └─ org.apache.dubbo.common.config.Environment.updateAppExternalConfigurationMap() + │ │ └─ org.apache.dubbo.config.context.ConfigManager.refreshAll() + │ ├─ org.apache.dubbo.config.bootstrap.DubboBootstrap.loadRemoteConfigs() + │ ├─ org.apache.dubbo.config.bootstrap.DubboBootstrap.checkGlobalConfigs() + │ ├─ org.apache.dubbo.config.bootstrap.DubboBootstrap.startMetadataCenter()【★启动元数据中心】 + │ │ └─ org.apache.dubbo.metadata.report.MetadataReportInstance.init() + │ │ └─ org.apache.dubbo.metadata.report.support.AbstractMetadataReportFactory.getMetadataReport() + │ │ └─ org.apache.dubbo.metadata.report.support.AbstractMetadataReportFactory.createMetadataReport() + │ │ └─ NEW org.apache.dubbo.metadata.store.zookeeper.ZookeeperMetadataReport() + │ └─ org.apache.dubbo.config.bootstrap.DubboBootstrap.initMetadataService() + │ + ├─ org.apache.dubbo.config.bootstrap.DubboBootstrap.exportServices()【★注册服务】 + │ └─ org.apache.dubbo.config.bootstrap.DubboBootstrap.exportService() + │ └─ org.apache.dubbo.config.ServiceConfig.export() + │ ├─ org.apache.dubbo.config.ServiceConfig.doExport() + │ │ └─ org.apache.dubbo.config.ServiceConfig.doExportUrls() + │ │ └─ org.apache.dubbo.config.ServiceConfig.doExportUrlsFor1Protocol() + │ │ ├─ org.apache.dubbo.config.ServiceConfig.exportLocal()【★暴露到本地injvm】 + │ │ │ └─ org.apache.dubbo.rpc.Protocol.export() + │ │ │ └─ org.apache.dubbo.qos.protocol.QosProtocolWrapper.export() + │ │ │ └─ org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper.export() + │ │ │ └─ org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper.export() + │ │ │ └─ org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol.export() + │ │ │ └─ NEW org.apache.dubbo.rpc.protocol.injvm.InjvmExporter() + │ │ ├─ org.apache.dubbo.rpc.Protocol.export()【★暴露到remote】 + │ │ │ └─ org.apache.dubbo.qos.protocol.QosProtocolWrapper.export() + │ │ │ ├─ org.apache.dubbo.qos.protocol.QosProtocolWrapper.startQosServer()【★启动本地QOS服务】 + │ │ │ └─ org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper.export() + │ │ │ └─ org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper.export() + │ │ │ └─ org.apache.dubbo.registry.integration.RegistryProtocol.export() + │ │ │ ├─ org.apache.dubbo.registry.integration.RegistryProtocol.doLocalExport()【★启动本地netty】 + │ │ │ │ └─ org.apache.dubbo.qos.protocol.QosProtocolWrapper.export() + │ │ │ │ └─ org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper.export() + │ │ │ │ └─ org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper.export() + │ │ │ │ └─ org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol.export() + │ │ │ │ └─ org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol.openServer() + │ │ │ │ └─ org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol.createServer() + │ │ │ ├─ org.apache.dubbo.registry.integration.RegistryProtocol.getRegistry()【★获取注册中心】 + │ │ │ │ └─ org.apache.dubbo.registry.integration.RegistryProtocol.getRegistry() + │ │ │ │ └─ org.apache.dubbo.registry.RegistryFactory.getRegistry() + │ │ │ │ └─ org.apache.dubbo.registry.support.AbstractRegistryFactory.getRegistry() + │ │ │ │ └─ org.apache.dubbo.registry.support.AbstractRegistryFactory.createRegistry()【★初始化注册中心】 + │ │ │ ├─ org.apache.dubbo.registry.RegistryService.register()【★注册服务】 + │ │ │ │ └─ org.apache.dubbo.registry.ListenerRegistryWrapper.register() + │ │ │ │ └─ org.apache.dubbo.registry.support.FailbackRegistry.register() + │ │ │ │ ├─ org.apache.dubbo.registry.support.AbstractRegistry.register() + │ │ │ │ └─ org.apache.dubbo.registry.zookeeper.ZookeeperRegistry.doRegister() + │ │ │ ├─ org.apache.dubbo.registry.integration.RegistryProtocol.registerStatedUrl() + │ │ │ └─ org.apache.dubbo.registry.RegistryService.subscribe()【★订阅configuration】 + │ │ │ └─ org.apache.dubbo.registry.ListenerRegistryWrapper.subscribe() + │ │ │ └─ org.apache.dubbo.registry.support.FailbackRegistry.subscribe() + │ │ │ ├─ org.apache.dubbo.registry.support.AbstractRegistry.subscribe() + │ │ │ └─ org.apache.dubbo.registry.zookeeper.ZookeeperRegistry.doSubscribe() + │ │ └─ org.apache.dubbo.registry.client.metadata.MetadataUtils.publishServiceDefinition() + │ └─ org.apache.dubbo.config.ServiceConfig.exported() + │ └─ org.apache.dubbo.config.spring.ServiceBean.exported() + │ ├─ org.apache.dubbo.config.ServiceConfig.exported() + │ └─ org.apache.dubbo.config.spring.ServiceBean.publishExportEvent() + │ + ├─ org.apache.dubbo.config.bootstrap.DubboBootstrap.exportMetadataService() + │ + ├─ org.apache.dubbo.config.bootstrap.DubboBootstrap.registerServiceInstance() + │ + ├─ org.apache.dubbo.config.bootstrap.DubboBootstrap.referServices()【★订阅服务】 + │ └─ org.apache.dubbo.config.utils.ReferenceConfigCache.get() + │ └─ org.apache.dubbo.config.ReferenceConfig.get()【★初始化消费者】 + │ └─ org.apache.dubbo.config.ReferenceConfig.init() + │ └─ org.apache.dubbo.config.ReferenceConfig.createProxy() + │ └─ org.apache.dubbo.rpc.Protocol.refer() + │ └─ org.apache.dubbo.qos.protocol.QosProtocolWrapper.refer() + │ ├─ org.apache.dubbo.qos.protocol.QosProtocolWrapper.startQosServer() + │ └─ org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper.refer() + │ └─ org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper.refer() + │ └─ org.apache.dubbo.registry.integration.RegistryProtocol.refer() + │ ├─ org.apache.dubbo.registry.integration.RegistryProtocol.getRegistry() + │ └─ org.apache.dubbo.registry.integration.RegistryProtocol.doRefer() + │ ├─ org.apache.dubbo.registry.integration.InterfaceCompatibleRegistryProtocol.getMigrationInvoker() + │ └─ org.apache.dubbo.registry.integration.RegistryProtocol.interceptInvoker() + │ └─ org.apache.dubbo.registry.client.migration.MigrationRuleListener.onRefer() + │ └─ org.apache.dubbo.registry.client.migration.MigrationRuleHandler.doMigrate() + │ └─ org.apache.dubbo.registry.client.migration.MigrationInvoker.migrateToServiceDiscoveryInvoker() + │ ├─ org.apache.dubbo.registry.client.migration.MigrationInvoker.refreshServiceDiscoveryInvoker()【★处理按节点订阅】 + │ │ └─ org.apache.dubbo.registry.integration.InterfaceCompatibleRegistryProtocol.getServiceDiscoveryInvoker() + │ │ └─ org.apache.dubbo.registry.integration.RegistryProtocol.doCreateInvoker() + │ │ ├─ org.apache.dubbo.registry.ListenerRegistryWrapper.register() + │ │ │ └─ org.apache.dubbo.registry.client.ServiceDiscoveryRegistry.register() + │ │ └─ org.apache.dubbo.registry.integration.DynamicDirectory.subscribe() + │ │ └─ org.apache.dubbo.registry.ListenerRegistryWrapper.subscribe() + │ │ └─ org.apache.dubbo.registry.client.ServiceDiscoveryRegistry.subscribe() + │ │ └─ org.apache.dubbo.registry.client.ServiceDiscoveryRegistry.doSubscribe() + │ └─ org.apache.dubbo.registry.client.migration.MigrationInvoker.refreshInterfaceInvoker()【★处理按接口订阅】 + │ └─ org.apache.dubbo.registry.integration.InterfaceCompatibleRegistryProtocol.getInvoker() + │ └─ org.apache.dubbo.registry.integration.RegistryProtocol.doCreateInvoker() + │ ├─ org.apache.dubbo.registry.ListenerRegistryWrapper.register()【★注册消费者信息】 + │ │ └─ org.apache.dubbo.registry.support.FailbackRegistry.register() + │ │ ├─ org.apache.dubbo.registry.support.AbstractRegistry.register() + │ │ └─ org.apache.dubbo.registry.zookeeper.ZookeeperRegistry.doRegister() + │ └─ org.apache.dubbo.registry.integration.RegistryDirectory.subscribe()【★订阅服务】 + │ └─ org.apache.dubbo.registry.ListenerRegistryWrapper.subscribe() + │ └─ org.apache.dubbo.registry.support.FailbackRegistry.subscribe() + │ ├─ org.apache.dubbo.registry.support.AbstractRegistry.subscribe() + │ └─ org.apache.dubbo.registry.zookeeper.ZookeeperRegistry.doSubscribe() + └─ 【处理异步注册及订阅】 + + ``` + +## Dubbo耗时分析 +[分析结果](https://www.yuque.com/pinxiong/analysis/vta1u9) + + +## 改进建议 +* 可并行 + * 多协议时可以考虑将ServiceConfig#doExportUrlsFor1Protocol并行处理 + * AbstractRegistry#notify()可以并行处理 + +* 可异步 + * MetadataUtils#publishServiceDefinition() + * MonitorService可以考虑异步加载,不建议在MonitorFilter中懒加载 + + QosProtocolWrapper#startQosServer可以考虑异步加载 + +* 预加载 + * DubboShutdownHook#getDubboShutdownHook()#register() + * ShutdownHookCallbacks#INSTANCE#addCallback(DubboBootstrap.this::destroy) + + + diff --git "a/sig/rpc/talk/blog/dubbo\347\232\204\350\264\237\350\275\275\345\235\207\350\241\241\347\255\226\347\225\245.md" "b/sig/rpc/talk/blog/dubbo\347\232\204\350\264\237\350\275\275\345\235\207\350\241\241\347\255\226\347\225\245.md" new file mode 100644 index 0000000..4b73142 --- /dev/null +++ "b/sig/rpc/talk/blog/dubbo\347\232\204\350\264\237\350\275\275\345\235\207\350\241\241\347\255\226\347\225\245.md" @@ -0,0 +1,376 @@ +# dubbo的负载均衡策略 +## loadbalance简介 + +LoadBalance 的职责是将网络请求,或者其他形式的负载“均摊”到不同的机器上。避免集群中部分服务器压力过大,而另一些服务器比较空闲的情况。通过负载均衡,可以让每台服务器获取到适合自己处理能力的负载。在为高负载服务器分流的同时,还可以避免资源浪费,一举两得。 + +## loadbalance调用流程 +```java + org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke() + |_org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.list() + |_org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.initLoadBalance() + |_org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke() + |_org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.select() + |_org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.doSelect() + |_org.apache.dubbo.rpc.cluster.LoadBalance.select() +``` +截止dubbo3.0版本,目前共有五种负载均衡策略 +```java +random=org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance 加权随机负载均衡 +roundrobin=org.apache.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance 加权轮询负载均衡 +leastactive=org.apache.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance 最小活跃数负载均衡 +consistenthash=org.apache.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance 一致性哈希负载均衡 +shortestresponse=org.apache.dubbo.rpc.cluster.loadbalance.ShortestResponseLoadBalance 最短响应时间负载均衡 +``` + +### RandomLoadBalance +* 简介: + +RandomLoadBalance 是加权随机算法的具体实现,它的算法思想很简单。假设我们有一组服务器 servers = [A, B, C],他们对应的权重为 weights = [5, 3, 2],权重总和为10。现在把这些权重值平铺在一维坐标值上,[0, 5) 区间属于服务器 A,[5, 8) 区间属于服务器 B,[8, 10) 区间属于服务器 C。接下来通过随机数生成器生成一个范围在 [0, 10) 之间的随机数,然后计算这个随机数会落到哪个区间上 +![随机负载均衡](pic/随机负载均衡.png) + +* demo:以org.apache.dubbo.demo包下的示例为参考 +```java +public interface DemoService { + + String sayHello(String name); + +} +``` + +```java +public class DemoServiceImpl implements DemoService { + private static final Logger logger = LoggerFactory.getLogger(DemoServiceImpl.class); + + @Override + public String sayHello(String name) { + logger.info("Hello " + name + ", request from consumer: " + RpcContext.getServiceContext().getRemoteAddress()); + return "Hello " + name + ", response from provider: " + RpcContext.getServiceContext().getLocalAddress(); + } + +} +``` +启动两个provider +provider1: +```java +public class Provider { + + public static void main(String[] args) throws Exception { + ServiceConfig service = new ServiceConfig<>(); + service.setInterface(DemoService.class); + service.setRef(new DemoServiceImpl()); + setLoadBalance(service, "random", 200); + + ProtocolConfig protocolConfig = setProtocolConfig(111); + + DubboBootstrap bootstrap = DubboBootstrap.getInstance(); + bootstrap.application(new ApplicationConfig("dubbo-demo-api-provider")) + .registry(new RegistryConfig("zookeeper://127.0.0.1:2181")) + .service(service) + .protocol(protocolConfig) + .start() + .await(); + + System.out.println("Server success"); + } + + private static ProtocolConfig setProtocolConfig(int port) { + ProtocolConfig protocolConfig = new ProtocolConfig(); + protocolConfig.setPort(port); + return protocolConfig; + } + + private static void setLoadBalance(ServiceConfig serviceConfig, String strategy, Integer weight) { + serviceConfig.setLoadbalance(strategy); + serviceConfig.setWeight(weight); + } +} +``` +provider2: +```java +public class Provider { + + public static void main(String[] args) throws Exception { + ServiceConfig service = new ServiceConfig<>(); + service.setInterface(DemoService.class); + service.setRef(new DemoServiceImpl()); + setLoadBalance(service, "random", 400); + + ProtocolConfig protocolConfig = setProtocolConfig(222); + + DubboBootstrap bootstrap = DubboBootstrap.getInstance(); + bootstrap.application(new ApplicationConfig("dubbo-demo-api-provider")) + .registry(new RegistryConfig("zookeeper://127.0.0.1:2181")) + .service(service) + .protocol(protocolConfig) + .start() + .await(); + + System.out.println("Server success"); + } + + private static ProtocolConfig setProtocolConfig(int port) { + ProtocolConfig protocolConfig = new ProtocolConfig(); + protocolConfig.setPort(port); + return protocolConfig; + } + + private static void setLoadBalance(ServiceConfig serviceConfig, String strategy, Integer weight) { + serviceConfig.setLoadbalance(strategy); + serviceConfig.setWeight(weight); + } +} +``` +consumer +```java +public class Consumer { + public static void main(String[] args) throws IOException { + ReferenceConfig reference = new ReferenceConfig<>(); + reference.setInterface(DemoService.class); + reference.setGeneric("true"); + + DubboBootstrap bootstrap = DubboBootstrap.getInstance(); + bootstrap.application(new ApplicationConfig("dubbo-demo-api-consumer")) + .registry(new RegistryConfig("zookeeper://127.0.0.1:2181")) + .reference(reference) + .start(); + + DemoService demoService = ReferenceConfigCache.getCache().get(reference); + + BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); + + while (true) { + String consoleContent = br.readLine(); + if ("quit".equalsIgnoreCase(consoleContent)) { + break; + } + + String message = demoService.sayHello(consoleContent); + System.out.println(message); + } + } +} +``` +result: +```java +Hello 1, response from provider: 10.32.114.45:222 +1 +Hello 1, response from provider: 10.32.114.45:111 +1 +Hello 1, response from provider: 10.32.114.45:222 +1 +Hello 1, response from provider: 10.32.114.45:111 +1 +Hello 1, response from provider: 10.32.114.45:222 +1 +Hello 1, response from provider: 10.32.114.45:111 +1 +Hello 1, response from provider: 10.32.114.45:222 +1 +Hello 1, response from provider: 10.32.114.45:222 +1 +Hello 1, response from provider: 10.32.114.45:111 +1 +Hello 1, response from provider: 10.32.114.45:222 +11 +Hello 11, response from provider: 10.32.114.45:222 +1 +Hello 1, response from provider: 10.32.114.45:222 +``` + +* 源码分析: +![随机负载均衡源码1](pic/随机负载均衡源码1.png) +![随机负载均衡源码2](pic/随机负载均衡源码2.png) + +### RoundRobinLoadBalance +* 简介: +经过加权后,每台服务器能够得到的请求数比例,接近或等于他们的权重比。比如服务器 A、B、C 权重比为 5:2:1。那么在8次请求中,服务器 A 将收到其中的5次请求,服务器 B 会收到其中的2次请求,服务器 C 则收到其中的1次请求。 + +* demo(修改Provider类中main方法): +provider1: +```java +setLoadBalance(service, "roundrobin", 200); +ProtocolConfig protocolConfig = setProtocolConfig(111); +``` +provider2: +```java +setLoadBalance(service, "roundrobin", 200); +ProtocolConfig protocolConfig = setProtocolConfig(222); +``` +provider3: +```java +setLoadBalance(service, "roundrobin", 400); +ProtocolConfig protocolConfig = setProtocolConfig(333); +``` +result: +```java +1 +Hello 1, response from provider: 10.32.114.45:333 +1 +Hello 1, response from provider: 10.32.114.45:222 +1 +Hello 1, response from provider: 10.32.114.45:111 +1 +Hello 1, response from provider: 10.32.114.45:333 +1 +Hello 1, response from provider: 10.32.114.45:333 +1 +Hello 1, response from provider: 10.32.114.45:222 +1 +Hello 1, response from provider: 10.32.114.45:111 +1 +Hello 1, response from provider: 10.32.114.45:333 +``` +* 源码分析: +![加权轮询1](pic/加权轮询1.png) +![加权轮询2](pic/加权轮询2.png) + +逻辑梳理示意: +![加权轮询3](pic/加权轮询3.png) + +### LeastActiveLoadBalance + +* 简介: +活跃调用数越小,表明该服务提供者效率越高,单位时间内可处理更多的请求。在具体实现中,每个服务提供者对应一个活跃数 active。初始情况下,所有服务提供者活跃数均为0。每收到一个请求,活跃数加1,完成请求后则将活跃数减1,性能好的服务提供者处理请求的速度更快,因此活跃数下降的也越快,此时这样的服务提供者能够优先获取到新的服务请求。活跃数相同的情况下,采用加权随机算法。 +* demo(修改Provider类中main方法): +provider1: +```java + setLoadBalance(service, "leastactive", 200); + ProtocolConfig protocolConfig = setProtocolConfig(36454); +``` +provider2: +```java + setLoadBalance(service, "leastactive", 400); + ProtocolConfig protocolConfig = setProtocolConfig(39515); +``` +consumer添加如下代码: +```java + reference.setActives(1000); + reference.setCheck(false); +``` +result: +```java +1 +Hello 1, response from provider: 10.32.114.45:39515 +1 +Hello 1, response from provider: 10.32.114.45:36454 +1 +Hello 1, response from provider: 10.32.114.45:39515 +1 +Hello 1, response from provider: 10.32.114.45:39515 +1 +Hello 1, response from provider: 10.32.114.45:39515 +1 +Hello 1, response from provider: 10.32.114.45:36454 +``` +* 源码分析: +![活跃调用1](pic/活跃调用1.png) +![活跃调用2](pic/活跃调用2.png) +![活跃调用3](pic/活跃调用3.png) +![活跃调用4](pic/活跃调用4.png) + +### ShortestResponseLoadBalance +* 简介: +与LeastActiveLoadBalance相似。不同的是,统计的数据不是活跃数active,而是最短响应时间 succeededAverageElapsed(成功请求的平均响应耗时) * active。 +* demo: + +provider1: +```java + setLoadBalance(service, "shortestresponse", 200); + ProtocolConfig protocolConfig = setProtocolConfig(37885); +``` +provider2: +```java + setLoadBalance(service, "shortestresponse", 400); + ProtocolConfig protocolConfig = setProtocolConfig(36418); +``` +consumer添加如下代码: +```java + reference.setActives(1000); + reference.setCheck(false); +``` +result: +```java +1 +Hello 1, response from provider: 10.32.114.45:36418 +1 +Hello 1, response from provider: 10.32.114.45:36418 +1 +Hello 1, response from provider: 10.32.114.45:36418 +1 +Hello 1, response from provider: 10.32.114.45:37885 +1 +Hello 1, response from provider: 10.32.114.45:36418 +1 +Hello 1, response from provider: 10.32.114.45:37885 +``` + +* 源码分析: +![加权最短响应时间](pic/加权最短响应时间.png) + +### ConsistentHashLoadBalance +* 简介: +一致性 hash 算法工作过程是,首先根据 ip 或者其他的信息为缓存节点生成一个 hash,并将这个 hash 投射到 [0, 2^32 - 1] 的圆环上。当有查询或写入请求时,则为缓存项的 key 生成一个 hash 值。然后查找第一个大于或等于该 hash 值的缓存节点,并到这个节点中查询或写入缓存项。 + +![一致性hash](pic/一致性hash.png) +出现的问题:数据倾斜 +改进:采用虚拟节点 +![一致性hash2](pic/一致性hash2.png) + +* 源码分析: +* demo: +provider1: +```java + setLoadBalance(service, "consistenthash", 200); + ProtocolConfig protocolConfig = setProtocolConfig(111); +``` +provider2: +```java + setLoadBalance(service, "consistenthash", 200); + ProtocolConfig protocolConfig = setProtocolConfig(222); +``` +result: +```java +1 +Hello 1, response from provider: 10.32.114.45:111 +1 +Hello 1, response from provider: 10.32.114.45:111 +1 +Hello 1, response from provider: 10.32.114.45:111 +1 +Hello 1, response from provider: 10.32.114.45:111 +2 +Hello 2, response from provider: 10.32.114.45:222 +2 +Hello 2, response from provider: 10.32.114.45:222 +2 +Hello 2, response from provider: 10.32.114.45:222 +2 +Hello 2, response from provider: 10.32.114.45:222 +``` +* 源码分析: +![一致性hash_code](pic/一致性hash_code.png) +![一致性hash_code2](pic/一致性hash_code2.png) +![一致性hash_code3](pic/一致性hash_code3.png) +![一致性hash_code4](pic/一致性hash_code4.png) + + +### 总结 +**加权随机负载均衡**: 根据权重随机选取provider,权重越高,provider被选中的概率越大。 + +**加权轮询负载均衡**:轮询选取provider,且基于权重分配provider被选中的次数。如A、B、C三个provider,权重为1、1、2,那么一共四次的选取中,A、B、C被选中的次数分别为1、1、2次。 + +**最小活跃数负载均衡**: 选取最小活跃数的provider。若各provider的活跃数相同,则根据权重随机选择provider。 + +**最短响应时间负载均衡**: 选取最短响应时间的provider。若各provider的最短响应时间相同,则根据权重随机选择provider。 + +**一致性哈希负载均衡**: 在一致性哈希算法中,引入了虚拟节点的概念,在hash环上设置均匀分散的虚拟节点指向provider。实现相同参数的请求指向同一个provider。 + + +### 参考文档: +https://dubbo.apache.org/zh/docs/v2.7/dev/source/loadbalance/#m-zhdocsv27devsourceloadbalance +https://blog.csdn.net/qq_27243343/article/details/106459095 + + + + + diff --git "a/sig/rpc/talk/blog/pic/Dubbo_Annotation\345\210\235\345\247\213\345\214\226\346\265\201\347\250\213.png" "b/sig/rpc/talk/blog/pic/Dubbo_Annotation\345\210\235\345\247\213\345\214\226\346\265\201\347\250\213.png" new file mode 100644 index 0000000..f6fd3d1 Binary files /dev/null and "b/sig/rpc/talk/blog/pic/Dubbo_Annotation\345\210\235\345\247\213\345\214\226\346\265\201\347\250\213.png" differ diff --git "a/sig/rpc/talk/blog/pic/Dubbo_XML\345\210\235\345\247\213\345\214\226\346\265\201\347\250\213.png" "b/sig/rpc/talk/blog/pic/Dubbo_XML\345\210\235\345\247\213\345\214\226\346\265\201\347\250\213.png" new file mode 100644 index 0000000..3449252 Binary files /dev/null and "b/sig/rpc/talk/blog/pic/Dubbo_XML\345\210\235\345\247\213\345\214\226\346\265\201\347\250\213.png" differ diff --git "a/sig/rpc/talk/blog/pic/Dubbo\345\220\257\345\212\250\346\265\201\347\250\213\346\246\202\350\247\210.png" "b/sig/rpc/talk/blog/pic/Dubbo\345\220\257\345\212\250\346\265\201\347\250\213\346\246\202\350\247\210.png" new file mode 100644 index 0000000..b86b1ff Binary files /dev/null and "b/sig/rpc/talk/blog/pic/Dubbo\345\220\257\345\212\250\346\265\201\347\250\213\346\246\202\350\247\210.png" differ diff --git "a/sig/rpc/talk/blog/pic/Provider\346\234\215\345\212\241\346\232\264\351\234\262\346\265\201\347\250\213.svg" "b/sig/rpc/talk/blog/pic/Provider\346\234\215\345\212\241\346\232\264\351\234\262\346\265\201\347\250\213.svg" new file mode 100644 index 0000000..baad66a --- /dev/null +++ "b/sig/rpc/talk/blog/pic/Provider\346\234\215\345\212\241\346\232\264\351\234\262\346\265\201\347\250\213.svg" @@ -0,0 +1 @@ +DubboBootstrapDubboBootstrapServiceConfigServiceConfigConfigurableMetadataServiceExporterConfigurableMetadataServiceExporterProtocolFilterWrapperProtocolFilterWrapperProtocolListenerWrapperProtocolListenerWrapperRegistryProtocolRegistryProtocolServiceDiscoveryRegistryServiceDiscoveryRegistryFailbackRegistryFailbackRegistryInjvmProtocolInjvmProtocolWritableMetadataServiceWritableMetadataServiceTripleProtocolTripleProtocol1暴露Triple服务2为Invoker添加Filter3为Exporter添加Listener4暴露Injvm协议的服务5注册service-discovery-registry协议67注册Metadata服务8为Invoker添加Filter9为Exporter添加Listener10存储元数据MetedataInfo11注册并订阅service-discovery-registry12注册registry协议1314暴露Provider15为Invoker添加Filter16为Exporter添加Listener17暴露Triple协议的服务18注册并订阅registry1920暴露MetadataService服务21为Invoker添加Filter22为Exporter添加Listener23暴露Injvm协议的MetadataService服务 \ No newline at end of file diff --git "a/sig/rpc/talk/blog/pic/Reference\346\240\270\345\277\203\345\220\257\345\212\250\346\265\201\347\250\213.png" "b/sig/rpc/talk/blog/pic/Reference\346\240\270\345\277\203\345\220\257\345\212\250\346\265\201\347\250\213.png" new file mode 100644 index 0000000..bc42c9b Binary files /dev/null and "b/sig/rpc/talk/blog/pic/Reference\346\240\270\345\277\203\345\220\257\345\212\250\346\265\201\347\250\213.png" differ diff --git "a/sig/rpc/talk/blog/pic/Service\346\240\270\345\277\203\345\220\257\345\212\250\346\265\201\347\250\213.png" "b/sig/rpc/talk/blog/pic/Service\346\240\270\345\277\203\345\220\257\345\212\250\346\265\201\347\250\213.png" new file mode 100644 index 0000000..c9cca56 Binary files /dev/null and "b/sig/rpc/talk/blog/pic/Service\346\240\270\345\277\203\345\220\257\345\212\250\346\265\201\347\250\213.png" differ diff --git a/sig/rpc/talk/blog/pic/dubbo-extension.jpg b/sig/rpc/talk/blog/pic/dubbo-extension.jpg new file mode 100644 index 0000000..5ebe3c9 Binary files /dev/null and b/sig/rpc/talk/blog/pic/dubbo-extension.jpg differ diff --git a/sig/rpc/talk/blog/pic/dubbo-framework.jpg b/sig/rpc/talk/blog/pic/dubbo-framework.jpg new file mode 100644 index 0000000..005408a Binary files /dev/null and b/sig/rpc/talk/blog/pic/dubbo-framework.jpg differ diff --git a/sig/rpc/talk/blog/pic/image.png b/sig/rpc/talk/blog/pic/image.png new file mode 100644 index 0000000..afe4175 Binary files /dev/null and b/sig/rpc/talk/blog/pic/image.png differ diff --git a/sig/rpc/talk/blog/pic/invoker.png b/sig/rpc/talk/blog/pic/invoker.png new file mode 100644 index 0000000..196abc2 Binary files /dev/null and b/sig/rpc/talk/blog/pic/invoker.png differ diff --git a/sig/rpc/talk/blog/pic/invoker1.png b/sig/rpc/talk/blog/pic/invoker1.png new file mode 100644 index 0000000..a64e3c0 Binary files /dev/null and b/sig/rpc/talk/blog/pic/invoker1.png differ diff --git a/sig/rpc/talk/blog/pic/invoker2.png b/sig/rpc/talk/blog/pic/invoker2.png new file mode 100644 index 0000000..25efc10 Binary files /dev/null and b/sig/rpc/talk/blog/pic/invoker2.png differ diff --git a/sig/rpc/talk/blog/pic/invoker3.png b/sig/rpc/talk/blog/pic/invoker3.png new file mode 100644 index 0000000..8b0652e Binary files /dev/null and b/sig/rpc/talk/blog/pic/invoker3.png differ diff --git a/sig/rpc/talk/blog/pic/invoker4.png b/sig/rpc/talk/blog/pic/invoker4.png new file mode 100644 index 0000000..e8aade9 Binary files /dev/null and b/sig/rpc/talk/blog/pic/invoker4.png differ diff --git a/sig/rpc/talk/blog/pic/invoker5.png b/sig/rpc/talk/blog/pic/invoker5.png new file mode 100644 index 0000000..260000f Binary files /dev/null and b/sig/rpc/talk/blog/pic/invoker5.png differ diff --git a/sig/rpc/talk/blog/pic/tomcat.png b/sig/rpc/talk/blog/pic/tomcat.png new file mode 100644 index 0000000..20c6f38 Binary files /dev/null and b/sig/rpc/talk/blog/pic/tomcat.png differ diff --git "a/sig/rpc/talk/blog/pic/triple-invoke\350\260\203\347\224\250.png" "b/sig/rpc/talk/blog/pic/triple-invoke\350\260\203\347\224\250.png" new file mode 100644 index 0000000..2f85ee9 Binary files /dev/null and "b/sig/rpc/talk/blog/pic/triple-invoke\350\260\203\347\224\250.png" differ diff --git "a/sig/rpc/talk/blog/pic/\344\270\200\350\207\264\346\200\247hash.png" "b/sig/rpc/talk/blog/pic/\344\270\200\350\207\264\346\200\247hash.png" new file mode 100644 index 0000000..a24fb8e Binary files /dev/null and "b/sig/rpc/talk/blog/pic/\344\270\200\350\207\264\346\200\247hash.png" differ diff --git "a/sig/rpc/talk/blog/pic/\344\270\200\350\207\264\346\200\247hash2.png" "b/sig/rpc/talk/blog/pic/\344\270\200\350\207\264\346\200\247hash2.png" new file mode 100644 index 0000000..947e590 Binary files /dev/null and "b/sig/rpc/talk/blog/pic/\344\270\200\350\207\264\346\200\247hash2.png" differ diff --git "a/sig/rpc/talk/blog/pic/\344\270\200\350\207\264\346\200\247hash_code.png" "b/sig/rpc/talk/blog/pic/\344\270\200\350\207\264\346\200\247hash_code.png" new file mode 100644 index 0000000..e90af9f Binary files /dev/null and "b/sig/rpc/talk/blog/pic/\344\270\200\350\207\264\346\200\247hash_code.png" differ diff --git "a/sig/rpc/talk/blog/pic/\344\270\200\350\207\264\346\200\247hash_code2.png" "b/sig/rpc/talk/blog/pic/\344\270\200\350\207\264\346\200\247hash_code2.png" new file mode 100644 index 0000000..0529e14 Binary files /dev/null and "b/sig/rpc/talk/blog/pic/\344\270\200\350\207\264\346\200\247hash_code2.png" differ diff --git "a/sig/rpc/talk/blog/pic/\344\270\200\350\207\264\346\200\247hash_code3.png" "b/sig/rpc/talk/blog/pic/\344\270\200\350\207\264\346\200\247hash_code3.png" new file mode 100644 index 0000000..95c99e1 Binary files /dev/null and "b/sig/rpc/talk/blog/pic/\344\270\200\350\207\264\346\200\247hash_code3.png" differ diff --git "a/sig/rpc/talk/blog/pic/\344\270\200\350\207\264\346\200\247hash_code4.png" "b/sig/rpc/talk/blog/pic/\344\270\200\350\207\264\346\200\247hash_code4.png" new file mode 100644 index 0000000..87416b3 Binary files /dev/null and "b/sig/rpc/talk/blog/pic/\344\270\200\350\207\264\346\200\247hash_code4.png" differ diff --git "a/sig/rpc/talk/blog/pic/\345\212\240\346\235\203\346\234\200\347\237\255\345\223\215\345\272\224\346\227\266\351\227\264.png" "b/sig/rpc/talk/blog/pic/\345\212\240\346\235\203\346\234\200\347\237\255\345\223\215\345\272\224\346\227\266\351\227\264.png" new file mode 100644 index 0000000..8600945 Binary files /dev/null and "b/sig/rpc/talk/blog/pic/\345\212\240\346\235\203\346\234\200\347\237\255\345\223\215\345\272\224\346\227\266\351\227\264.png" differ diff --git "a/sig/rpc/talk/blog/pic/\345\212\240\346\235\203\350\275\256\350\257\2421.png" "b/sig/rpc/talk/blog/pic/\345\212\240\346\235\203\350\275\256\350\257\2421.png" new file mode 100644 index 0000000..fab26d7 Binary files /dev/null and "b/sig/rpc/talk/blog/pic/\345\212\240\346\235\203\350\275\256\350\257\2421.png" differ diff --git "a/sig/rpc/talk/blog/pic/\345\212\240\346\235\203\350\275\256\350\257\2422.png" "b/sig/rpc/talk/blog/pic/\345\212\240\346\235\203\350\275\256\350\257\2422.png" new file mode 100644 index 0000000..1d36eb8 Binary files /dev/null and "b/sig/rpc/talk/blog/pic/\345\212\240\346\235\203\350\275\256\350\257\2422.png" differ diff --git "a/sig/rpc/talk/blog/pic/\345\212\240\346\235\203\350\275\256\350\257\2423.png" "b/sig/rpc/talk/blog/pic/\345\212\240\346\235\203\350\275\256\350\257\2423.png" new file mode 100644 index 0000000..ac6ba0f Binary files /dev/null and "b/sig/rpc/talk/blog/pic/\345\212\240\346\235\203\350\275\256\350\257\2423.png" differ diff --git "a/sig/rpc/talk/blog/pic/\346\232\264\351\234\262Injvm\345\215\217\350\256\256\347\232\204\346\234\215\345\212\241.svg" "b/sig/rpc/talk/blog/pic/\346\232\264\351\234\262Injvm\345\215\217\350\256\256\347\232\204\346\234\215\345\212\241.svg" new file mode 100644 index 0000000..f4c445c --- /dev/null +++ "b/sig/rpc/talk/blog/pic/\346\232\264\351\234\262Injvm\345\215\217\350\256\256\347\232\204\346\234\215\345\212\241.svg" @@ -0,0 +1 @@ +DubboBootstrapDubboBootstrapServiceConfigServiceConfigProtocolFilterWrapperProtocolFilterWrapperFilterChainBuilderFilterChainBuilderProtocolListenerWrapperProtocolListenerWrapperListenerExporterWrapperListenerExporterWrapperInjvmProtocolInjvmProtocol1暴露Triple服务23为Invoker添加Filter4返回嵌套Filter的Invoker56为Exporter添加ExporterListener7返回嵌套ExporterListener的Exporter8暴露Injvm协议的服务 \ No newline at end of file diff --git "a/sig/rpc/talk/blog/pic/\346\232\264\351\234\262MetadataService\346\234\215\345\212\241.svg" "b/sig/rpc/talk/blog/pic/\346\232\264\351\234\262MetadataService\346\234\215\345\212\241.svg" new file mode 100644 index 0000000..3ced9bf --- /dev/null +++ "b/sig/rpc/talk/blog/pic/\346\232\264\351\234\262MetadataService\346\234\215\345\212\241.svg" @@ -0,0 +1 @@ +DubboBootstrapDubboBootstrapConfigurableMetadataServiceExporterConfigurableMetadataServiceExporterServiceConfigServiceConfigProtocolFilterWrapperProtocolFilterWrapperFilterChainBuilderFilterChainBuilderProtocolListenerWrapperProtocolListenerWrapperListenerExporterWrapperListenerExporterWrapperInjvmProtocolInjvmProtocol12暴露MetadataService服务34为Invoker添加Filter5返回嵌套Filter的Invoker67为Exporter添加ExporterListener8返回嵌套ExporterListener的Exporter9暴露Injvm协议的Triple服务 \ No newline at end of file diff --git "a/sig/rpc/talk/blog/pic/\346\232\264\351\234\262Triple\345\215\217\350\256\256\347\232\204\346\234\215\345\212\241\345\271\266\346\263\250\345\206\214registry\345\215\217\350\256\256.svg" "b/sig/rpc/talk/blog/pic/\346\232\264\351\234\262Triple\345\215\217\350\256\256\347\232\204\346\234\215\345\212\241\345\271\266\346\263\250\345\206\214registry\345\215\217\350\256\256.svg" new file mode 100644 index 0000000..c395809 --- /dev/null +++ "b/sig/rpc/talk/blog/pic/\346\232\264\351\234\262Triple\345\215\217\350\256\256\347\232\204\346\234\215\345\212\241\345\271\266\346\263\250\345\206\214registry\345\215\217\350\256\256.svg" @@ -0,0 +1 @@ +ServiceConfigServiceConfigProtocolFilterWrapperProtocolFilterWrapperProtocolListenerWrapperProtocolListenerWrapperRegistryProtocolRegistryProtocolFilterChainBuilderFilterChainBuilderListenerExporterWrapperListenerExporterWrapperTripleProtocolTripleProtocolListenerRegistryWrapperListenerRegistryWrapperZookeeperRegistryZookeeperRegistryServiceNameMappingServiceNameMappingRegistry CenterRegistry Center注册registry协议透传注册Triple服务为Invoker添加Filter返回嵌套Filter的Invoker为Exporter添加ExporterListener返回嵌套ExporterListener的Exporter暴露Triple协议的服务调用Registry注册Triple服务到注册中心对ZookeeperRegistry增加RegistryServiceListener将Triple实例数据注册到注册中心调用RegistryServiceListener#onRegistry发布注册事件调用ListenerRegistryWrapper#subscribe订阅地址变更事件建立暴露的Triple服务与MetadataReport之间的关系,通过版本号的变更通知Consumer端 \ No newline at end of file diff --git "a/sig/rpc/talk/blog/pic/\346\263\250\345\206\214service-discovery-registry\345\215\217\350\256\256.svg" "b/sig/rpc/talk/blog/pic/\346\263\250\345\206\214service-discovery-registry\345\215\217\350\256\256.svg" new file mode 100644 index 0000000..17180e8 --- /dev/null +++ "b/sig/rpc/talk/blog/pic/\346\263\250\345\206\214service-discovery-registry\345\215\217\350\256\256.svg" @@ -0,0 +1 @@ +ServiceConfigServiceConfigProtocolFilterWrapperProtocolFilterWrapperProtocolListenerWrapperProtocolListenerWrapperRegistryProtocolRegistryProtocolFilterChainBuilderFilterChainBuilderListenerExporterWrapperListenerExporterWrapperTripleProtocolTripleProtocolListenerRegistryWrapperListenerRegistryWrapperServiceDiscoveryRegistryServiceDiscoveryRegistryWritableMetadataServiceWritableMetadataService1注册service-discovery-registry协议2透传3注册Metadata服务45为Invoker添加Filter6返回嵌套Filter的Invoker78为Exporter添加ExporterListener9返回嵌套ExporterListener的Exporter10暴露Triple协议的服务11调用Registry注册元数据12对ServiceDiscoveryRegistry增加RegistryServiceListener13注册元数据MetedataInfo14调用RegistryServiceListener#onRegistry发布注册成功事件15调用RegistryProtocolListener#onExport发布服务注册事件 \ No newline at end of file diff --git "a/sig/rpc/talk/blog/pic/\346\264\273\350\267\203\350\260\203\347\224\2501.png" "b/sig/rpc/talk/blog/pic/\346\264\273\350\267\203\350\260\203\347\224\2501.png" new file mode 100644 index 0000000..b19ad25 Binary files /dev/null and "b/sig/rpc/talk/blog/pic/\346\264\273\350\267\203\350\260\203\347\224\2501.png" differ diff --git "a/sig/rpc/talk/blog/pic/\346\264\273\350\267\203\350\260\203\347\224\2502.png" "b/sig/rpc/talk/blog/pic/\346\264\273\350\267\203\350\260\203\347\224\2502.png" new file mode 100644 index 0000000..a77f3b4 Binary files /dev/null and "b/sig/rpc/talk/blog/pic/\346\264\273\350\267\203\350\260\203\347\224\2502.png" differ diff --git "a/sig/rpc/talk/blog/pic/\346\264\273\350\267\203\350\260\203\347\224\2503.png" "b/sig/rpc/talk/blog/pic/\346\264\273\350\267\203\350\260\203\347\224\2503.png" new file mode 100644 index 0000000..3df60ae Binary files /dev/null and "b/sig/rpc/talk/blog/pic/\346\264\273\350\267\203\350\260\203\347\224\2503.png" differ diff --git "a/sig/rpc/talk/blog/pic/\346\264\273\350\267\203\350\260\203\347\224\2504.png" "b/sig/rpc/talk/blog/pic/\346\264\273\350\267\203\350\260\203\347\224\2504.png" new file mode 100644 index 0000000..243002c Binary files /dev/null and "b/sig/rpc/talk/blog/pic/\346\264\273\350\267\203\350\260\203\347\224\2504.png" differ diff --git "a/sig/rpc/talk/blog/pic/\351\232\217\346\234\272\350\264\237\350\275\275\345\235\207\350\241\241.png" "b/sig/rpc/talk/blog/pic/\351\232\217\346\234\272\350\264\237\350\275\275\345\235\207\350\241\241.png" new file mode 100644 index 0000000..c52bf58 Binary files /dev/null and "b/sig/rpc/talk/blog/pic/\351\232\217\346\234\272\350\264\237\350\275\275\345\235\207\350\241\241.png" differ diff --git "a/sig/rpc/talk/blog/pic/\351\232\217\346\234\272\350\264\237\350\275\275\345\235\207\350\241\241\346\272\220\347\240\2011.png" "b/sig/rpc/talk/blog/pic/\351\232\217\346\234\272\350\264\237\350\275\275\345\235\207\350\241\241\346\272\220\347\240\2011.png" new file mode 100644 index 0000000..790a089 Binary files /dev/null and "b/sig/rpc/talk/blog/pic/\351\232\217\346\234\272\350\264\237\350\275\275\345\235\207\350\241\241\346\272\220\347\240\2011.png" differ diff --git "a/sig/rpc/talk/blog/pic/\351\232\217\346\234\272\350\264\237\350\275\275\345\235\207\350\241\241\346\272\220\347\240\2012.png" "b/sig/rpc/talk/blog/pic/\351\232\217\346\234\272\350\264\237\350\275\275\345\235\207\350\241\241\346\272\220\347\240\2012.png" new file mode 100644 index 0000000..db7a95c Binary files /dev/null and "b/sig/rpc/talk/blog/pic/\351\232\217\346\234\272\350\264\237\350\275\275\345\235\207\350\241\241\346\272\220\347\240\2012.png" differ diff --git "a/sig/rpc/talk/blog/\346\267\261\345\205\245\350\247\243\346\236\220Dubbo3.0\346\234\215\345\212\241\347\253\257\346\232\264\351\234\262\345\205\250\346\265\201\347\250\213.md" "b/sig/rpc/talk/blog/\346\267\261\345\205\245\350\247\243\346\236\220Dubbo3.0\346\234\215\345\212\241\347\253\257\346\232\264\351\234\262\345\205\250\346\265\201\347\250\213.md" new file mode 100644 index 0000000..e82c0ad --- /dev/null +++ "b/sig/rpc/talk/blog/\346\267\261\345\205\245\350\247\243\346\236\220Dubbo3.0\346\234\215\345\212\241\347\253\257\346\232\264\351\234\262\345\205\250\346\265\201\347\250\213.md" @@ -0,0 +1,997 @@ +# 深入解析Dubbo3.0服务端暴露全流程 +## 背景 +随着云原生时代的到来,Dubbo3.0的一个很重要的目标就是全面拥抱云原生。正因如此,Dubbo3.0为了能够更好的适配云原生,将原来的接口级服务发现机制演进为应用级服务发现机制。 +基于应用级服务发现机制,Dubbo3.0能大幅降低框架带来的额外资源消耗,大幅提升资源利用率,主要体现在: + +* 单机常驻内存下降 75% +* 能支持的集群实例规模以百万计的集群 +* 注册中心总体数据量下降超 90% + +目前关于Dubbo服务端暴露流程的技术文章很多,但是都是基于Dubbo接口级服务发现机制来解读的。在Dubbo3.0的应用级服务发现机制下,服务端暴露流程与之前有很大的变化,本文希望可以通过对Dubbo3.0源码理解来解析服务端暴露全流程。 + +## 什么是应用级服务发现 +简单来说,以前Dubbo是将接口的信息全部注册到注册中心,而一个应用实例一般会存在多个接口,这样一来注册的数据量就要大很多,而且有冗余。应用级服务发现的机制是同一个应用实例仅在注册中心注册一条数据,这种机制主要解决以下几个问题: +* 对齐主流微服务模型,如:Spring Cloud +* 支持Kubernetes native service,Kubernetes中维护调度的服务都是基于应用实例级,不支持接口级 +* 减少注册中心数据存储能力,降低了地址变更推送的压力 + +假设应用dubbo-application部署了3个实例(instance1, instance2, instance3),并且对外提供了3个接口(sayHello, echo, getVersion)分别设置了不同的超时时间。在接口级和应用级服务发现机制下,注册到注册中心的数据是截然不同的。如下图所示: + +* 接口级服务发现机制下注册中心中的数据 +```java +"sayHello": [ + {"application":"dubbo-application","name":"instance1", "ip":"127.0.0.1", "metadata":{"timeout":1000}}, + {"application":"dubbo-application","name":"instance2", "ip":"127.0.0.2", "metadata":{"timeout":2000}}, + {"application":"dubbo-application","name":"instance3", "ip":"127.0.0.3", "metadata":{"timeout":3000}}, +], +"echo": [ + {"application":"dubbo-application","name":"instance1", "ip":"127.0.0.1", "metadata":{"timeout":1000}}, + {"application":"dubbo-application","name":"instance2", "ip":"127.0.0.2", "metadata":{"timeout":2000}}, + {"application":"dubbo-application","name":"instance3", "ip":"127.0.0.3", "metadata":{"timeout":3000}}, +], +"getVersion": [ + {"application":"dubbo-application","name":"instance1", "ip":"127.0.0.1", "metadata":{"timeout":1000}}, + {"application":"dubbo-application","name":"instance2", "ip":"127.0.0.2", "metadata":{"timeout":2000}}, + {"application":"dubbo-application","name":"instance3", "ip":"127.0.0.3", "metadata":{"timeout":3000}} +] +``` + +* 应用级服务发现机制下注册中心中的数据 +```java +"dubbo-application": [ + {"name":"instance1", "ip":"127.0.0.1", "metadata":{"timeout":1000}}, + {"name":"instance2", "ip":"127.0.0.2", "metadata":{"timeout":2000}}, + {"name":"instance3", "ip":"127.0.0.3", "metadata":{"timeout":3000}} +] +``` + +通过对比我们可以发现,采用应用级服务发现机制确实使注册中心中的数据量减少了很多,那些原有的接口级的数据存储在元数据中心中。 + + +## 服务端暴露全流程 +引入应用级服务发现机制以后,Dubbo服务端暴露全流程和之前有很大的区别。暴露服务端全流程的核心代码在DubboBootstrap#doStart中,具体如下: +```java +private void doStart() { + // 1. 暴露Dubbo服务 + exportServices(); + // If register consumer instance or has exported services + if (isRegisterConsumerInstance() || hasExportedServices()) { + // 2. 暴露元数据服务 + exportMetadataService(); + // 3. 定时更新和上报元数据 + registerServiceInstance(); + .... + } + ...... +} +``` +假设用Zookeeper作为注册中,对外暴露Triple协议的服务为例,服务端暴露全流程时序图如下: +![Provider服务暴露流程](pic/Provider服务暴露流程.svg) + +我们可以看到,整个的暴露流程还是挺复杂的,一共可以分为四个部分: +* 暴露injvm协议的服务 +* 注册service-discovery-registry协议 +* 暴露Triple协议的服务并注册registry协议 +* 暴露MetadataService服务 + +下面会分别从这四个部分对服务暴露流程进行详细讲解。 + +### 暴露injvm协议的服务 +injvm协议的服务是暴露在本地的,主要原因是在一个应用上往往既有Service(暴露服务)又有Reference(服务引用)的情况存在,并且Reference引用的服务就是在该应用上暴露的Service。为了支持这种使用场景,Dubbo提供了injvm协议,将Service暴露在本地,Reference就可以不需要走网络直接在本地调用Service。 + +#### 整体时序图 +![暴露Injvm协议的服务](pic/暴露Injvm协议的服务.svg) + +#### 核心代码 +* 暴露injvm协议的服务入口 +核心代码在ServiceConfig#exportLocal中,具体如下: +```java +private void exportLocal(URL url) { + // 协议变更 + URL local = URLBuilder.from(url) + .setProtocol(LOCAL_PROTOCOL) + .setHost(LOCALHOST_VALUE) + .setPort(0) + .build(); + // 暴露injvm协议 + doExportUrl(local, false); + logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local); +} +``` + +* 创建Invoker +核心代码在ServiceConfig#doExportUrl中,具体如下: +```java +private void doExportUrl(URL url, boolean withMetaData) { + // 通过SPI动态创建Invoker + Invoker invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url); + // 如果是暴露Metadata对Invoker进行包装 + if (withMetaData) { + invoker = new DelegateProviderMetaDataInvoker(invoker, this); + } + // 通过协议将Invoker进行包装成Exporter + // PROTOCOL为Triple协议的代理类 + Exporter exporter = PROTOCOL.export(invoker); + exporters.add(exporter); +} +``` + +* 在Invoker中添加Filter +核心代码在DefaultFilterChainBuilder#buildInvokerChain中,具体如下: +```java +public Invoker buildInvokerChain(final Invoker originalInvoker, String key, String group) { + Invoker last = originalInvoker; + List filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(originalInvoker.getUrl(), key, group); + + // 为invoker添加8个Filter + // 1. TracerFilter + // 2. TimeoutFilter + // 3. MonitorFilter + // 4. ExceptionFilter + // 5. ContextFilter + // 6. GenericFilter + // 7. ClassLoaderFilter + // 8. EchoFilter + if (!filters.isEmpty()) { + for (int i = filters.size() - 1; i >= 0; i--) { + final Filter filter = filters.get(i); + final Invoker next = last; + last = new FilterChainNode<>(originalInvoker, next, filter); + } + } + return last; +} +``` + +* 将Invoker转化为Exporter +核心代码在ProtocolListenerWrapper#export中,具体如下: +```java +public Exporter export(Invoker invoker) throws RpcException { + // 如果是registry或者service-discovery-registry协议就直接通过协议将Invoker转化成Exporter + if (UrlUtils.isRegistry(invoker.getUrl())) { + return protocol.export(invoker); + } + // 1. 将invoker通过协议转化成Exporter + // 2. 在Exporter中组合ExporterListener + return new ListenerExporterWrapper(protocol.export(invoker), + Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) + .getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY))); +} +``` + +* 在Exporter中组合ExporterListener +在ExporterListener中定义了exported和unexported事件,用来维护整个Exporter的生命周期,核心代码在ListenerExporterWrapper的构造函数和ListenerExporterWrapper#unexport中,具体如下: + +**ListenerExporterWrapper构造函数** +```java +public ListenerExporterWrapper(Exporter exporter, List listeners) { + ...... + this.exporter = exporter; + this.listeners = listeners; + if (CollectionUtils.isNotEmpty(listeners)) { + RuntimeException exception = null; + for (ExporterListener listener : listeners) { + if (listener != null) { + try { + // 暴露Exporter后 + listener.exported(this); + } catch (RuntimeException t) { + logger.error(t.getMessage(), t); + exception = t; + } + } + } + ...... + } +} +``` +**ListenerExporterWrapper#unexport** +```java +public void unexport() { + try { + exporter.unexport(); + } finally { + if (CollectionUtils.isNotEmpty(listeners)) { + RuntimeException exception = null; + for (ExporterListener listener : listeners) { + if (listener != null) { + try { + // 取消暴露Exporter后 + listener.unexported(this); + } catch (RuntimeException t) { + logger.error(t.getMessage(), t); + exception = t; + } + } + } + if (exception != null) { + throw exception; + } + } + } +} +``` + +* 暴露Triple协议的服务 +核心代码在TripleProtocol#export中,具体如下: +```java +public Exporter export(Invoker invoker) throws RpcException { + URL url = invoker.getUrl(); + String key = serviceKey(url); + final AbstractExporter exporter = new AbstractExporter(invoker) { + @Override + public void afterUnExport() { + pathResolver.remove(url.getServiceKey()); + pathResolver.remove(url.getServiceInterface()); + exporterMap.remove(key); + } + }; + + exporterMap.put(key, exporter); + invokers.add(invoker); + pathResolver.add(url.getServiceKey(), invoker); + pathResolver.add(url.getServiceInterface(), invoker); + PortUnificationExchanger.bind(invoker.getUrl()); + return exporter; +} +``` +由于Triple协议的实现流程相对来说比较复杂,暂时不在这里展开讨论,以后会对其进行详细讲解。 + +#### 注册service-discovery-registry协议 +注册service-discovery-registry协议的核心目的是为了注册与服务相关的元数据,默认情况下元数据通过InMemoryWritableMetadataService将数据存储在本地内存和本地文件。 + +##### 整体时序图 +![注册service-discovery-registry协议](pic/注册service-discovery-registry协议.svg) + +##### 核心代码 +核心代码在ServiceConfig#exportRemote中,具体如下: +* 注册service-discovery-registry协议的入口 +```java +private URL exportRemote(URL url, List registryURLs) { + if (CollectionUtils.isNotEmpty(registryURLs)) { + // 如果是多个注册中心,通过循环对每个注册中心进行注册 + for (URL registryURL : registryURLs) { + // 判断是否是service-discovery-registry协议 + // 将service-name-mapping参数的值设置为true + if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) { + url = url.addParameterIfAbsent(SERVICE_NAME_MAPPING_KEY, "true"); + } + ...... + // 注册service-discovery-registry协议复用服务暴露流程 + doExportUrl(registryURL.putAttribute(EXPORT_KEY, url), true); + } + ...... + return url; +} +``` + +* invoker中包装Metadata +核心代码在ServiceConfig#doExportUrl中,具体如下: +```java +private void doExportUrl(URL url, boolean withMetaData) { + Invoker invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url); + // 此时的withMetaData的值为true + // 将invoker包装成DelegateProviderMetaDataInvoker + if (withMetaData) { + invoker = new DelegateProviderMetaDataInvoker(invoker, this); + } + Exporter exporter = PROTOCOL.export(invoker); + exporters.add(exporter); +} + + +public class DelegateProviderMetaDataInvoker implements Invoker { + protected final Invoker invoker; + private ServiceConfig metadata; + // 在Invoker中组合了metadata + public DelegateProviderMetaDataInvoker(Invoker invoker, ServiceConfig metadata) { + this.invoker = invoker; + this.metadata = metadata; + } + ...... + + @Override + public Result invoke(Invocation invocation) throws RpcException { + return invoker.invoke(invocation); + } + + public ServiceConfig getMetadata() { + return metadata; + } +} +``` + +* 通过RegistryProtocol将Invoker转化成Exporter +核心代码在ProtocolListenerWrapper#export中,具体如下: +```java +public Exporter export(Invoker invoker) throws RpcException { + // 此时的protocol为RegistryProtocol类型 + if (UrlUtils.isRegistry(invoker.getUrl())) { + return protocol.export(invoker); + } + ...... +} +``` + +* RegistryProtocol将Invoker转化成Exporter的核心流程 +核心代码在RegistryProtocol#export中,具体如下: +```java +public Exporter export(final Invoker originInvoker) throws RpcException { + URL registryUrl = getRegistryUrl(originInvoker); + URL providerUrl = getProviderUrl(originInvoker); + ...... + // 再次暴露Triple协议的服务 + final ExporterChangeableWrapper exporter = doLocalExport(originInvoker, providerUrl); + + // registryUrl中包含service-discovery-registry协议 + // 通过该协议创建ServiceDiscoveryRegistry对象 + // 然后组合RegistryServiceListener监听器, + // 最后包装成ListenerRegistryWrapper对象 + final Registry registry = getRegistry(registryUrl); + final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl); + + boolean register = providerUrl.getParameter(REGISTER_KEY, true); + if (register) { + // 注册service-discovery-registry协议 + // 触发RegistryServiceListener的onRegister事件 + register(registry, registeredProviderUrl); + } + ...... + // 触发RegistryServiceListener的onRegister事件 + notifyExport(exporter); + return new DestroyableExporter<>(exporter); +} +``` + +* 再次暴露Triple协议的服务 +核心代码在RegistryProtocol#doLocalExport中,具体如下: +```java +private ExporterChangeableWrapper doLocalExport(final Invoker originInvoker, URL providerUrl) { + String key = getCacheKey(originInvoker); + // 此时的protocol为Triple协议的代理类 + // 和上面讲到的暴露injvm协议的PROTOCOL相同 + return (ExporterChangeableWrapper) bounds.computeIfAbsent(key, s -> { + Invoker invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl); + return new ExporterChangeableWrapper<>((Exporter) protocol.export(invokerDelegate), originInvoker); + }); +} +``` + +* 创建ListenerRegistryWrapper对象 +核心代码在RegistryFactoryWrapper#getRegistry中,具体如下: +```java +public Registry getRegistry(URL url) { + return new ListenerRegistryWrapper(registryFactory.getRegistry(url), + Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(RegistryServiceListener.class) + .getActivateExtension(url, "registry.listeners"))); +} +``` + +* 注册service-discovery-registry协议 +核心代码在ServiceDiscoveryRegistry#register和ServiceDiscoveryRegistry#doRegister中,具体如下: + +**ServiceDiscoveryRegistry#register** +```java +public final void register(URL url) { + // 只有服务端(Provider)才需要注册 + if (!shouldRegister(url)) { + return; + } + // 注册service-discovery-registry协议 + doRegister(url); +} +``` + +**ServiceDiscoveryRegistry#doRegister** +```java +public void doRegister(URL url) { + url = addRegistryClusterKey(url); + // 注册元数据 + if (writableMetadataService.exportURL(url)) { + if (logger.isInfoEnabled()) { + logger.info(format("The URL[%s] registered successfully.", url.toString())); + } + } else { + if (logger.isWarnEnabled()) { + logger.warn(format("The URL[%s] has been registered.", url.toString())); + } + } +} +``` + +**注册元数据** +核心代码在InMemoryWritableMetadataService#exportURL中,具体如下: +```java +public boolean exportURL(URL url) { + // 如果是MetadataService,则不注册元数据 + if (MetadataService.class.getName().equals(url.getServiceInterface())) { + this.metadataServiceURL = url; + return true; + } + + updateLock.readLock().lock(); + try { + String[] clusters = getRegistryCluster(url).split(","); + for (String cluster : clusters) { + MetadataInfo metadataInfo = metadataInfos.computeIfAbsent(cluster, k -> new MetadataInfo(ApplicationModel.getName())); + // 将Triple协议的服务中接口相关的数据生成ServiceInfo + // 将ServiceInfo注册到MetadataInfo中 + metadataInfo.addService(new ServiceInfo(url)); + } + metadataSemaphore.release(); + return addURL(exportedServiceURLs, url); + } finally { + updateLock.readLock().unlock(); + } +} +``` + +**发布onRegister事件** +核心代码在ListenerRegistryWrapper#register中,具体如下: +```java +public void register(URL url) { + try { + // registry为ServiceDiscoveryRegistry对象 + // 此时已经调用完ServiceDiscoveryRegistry#registry方法 + registry.register(url); + } finally { + if (CollectionUtils.isNotEmpty(listeners) && !UrlUtils.isConsumer(url)) { + RuntimeException exception = null; + for (RegistryServiceListener listener : listeners) { + if (listener != null) { + try { + // 注册完service-discovery-registry协议后发布onRegister事件 + listener.onRegister(url, registry); + } catch (RuntimeException t) { + logger.error(t.getMessage(), t); + exception = t; + } + } + } + if (exception != null) { + throw exception; + } + } + } +} +``` + +**发布服务注册事件** +核心代码在RegistryProtocol#notifyExport中,具体如下: +```java +private void notifyExport(ExporterChangeableWrapper exporter) { + List listeners = ExtensionLoader.getExtensionLoader(RegistryProtocolListener.class) + .getActivateExtension(exporter.getOriginInvoker().getUrl(), "registry.protocol.listener"); + if (CollectionUtils.isNotEmpty(listeners)) { + for (RegistryProtocolListener listener : listeners) { + // 发布RegistryProtocolListener的onExport事件 + listener.onExport(this, exporter); + } + } +} +``` +我们可以看出注册service-discovery-registry协议的核心目的是为了将服务的接口相关的信息存储在内存中。从兼容性和平滑迁移两方面来考虑,社区在实现的时候采取复用ServiceConfig的暴露流程的方式。 + +##### 暴露Triple协议服务并注册registry协议 +暴露Triple协议的服务并注册registry协议是Dubbo服务暴露的核心流程,一共分为两部分: +* 暴露Triple协议的服务 +* 注册registry协议 + +由于暴露Triple协议服务的流程和暴露Injvm协议服务的流程是一致的,所以不再赘述。注册registry协议的过程仅仅注册了应用实例相关的信息,也就是之前提到的应用级服务发现机制。 + +**整体时序图** +![暴露Triple协议的服务并注册registry协议](pic/暴露Triple协议的服务并注册registry协议.svg) + +**核心代码** +* 通过InterfaceCompatibleRegistryProtocol将Invoker转化成Exporter + +核心代码在ProtocolListenerWrapper#export中,具体如下: +```java +public Exporter export(Invoker invoker) throws RpcException { + // 此时的protocol为InterfaceCompatibleRegistryProtocol类型(继承了RegistryProtocol) + // 注意:在注册service-discovery-registry协议的时候protocol为RegistryProtocol类型 + if (UrlUtils.isRegistry(invoker.getUrl())) { + return protocol.export(invoker); + } + ...... +} +``` + +* RegistryProtocol将Invoker转化成Exporter的核心流程 + +核心代码在RegistryProtocol#export中,具体如下: +```java +public Exporter export(final Invoker originInvoker) throws RpcException { + URL registryUrl = getRegistryUrl(originInvoker); + URL providerUrl = getProviderUrl(originInvoker); + ...... + // 再次暴露Triple协议的服务 + final ExporterChangeableWrapper exporter = doLocalExport(originInvoker, providerUrl); + + // registryUrl中包含registry协议 + // 通过该协议创建ZookeeperRegistry对象 + // 然后组合RegistryServiceListener监听器, + // 最后包装成ListenerRegistryWrapper对象 + // 注意: + // 1. service-discovery-registry协议对应的是ServiceDiscoveryRegistry + // 2. registry协议对应的是ZookeeperRegistry + final Registry registry = getRegistry(registryUrl); + final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl); + + boolean register = providerUrl.getParameter(REGISTER_KEY, true); + if (register) { + // 注册registry协议 + // 触发RegistryServiceListener的onRegister事件 + register(registry, registeredProviderUrl); + } + ...... + // 发布RegistryProtocolListener的onExport事件 + notifyExport(exporter); + return new DestroyableExporter<>(exporter); +} +``` + +* 注册registry协议 + +核心代码在FailbackRegistry#register和ServiceDiscoveryRegistry#doRegister中(ZookeeperRegistry继承FailbackRegistry)中,具体如下: + +**FailbackRegistry#register** +```java +public void register(URL url) { + if (!acceptable(url)) { + ...... + try { + // 注册registry协议 + doRegister(url); + } catch (Exception e) { + ...... + } + } +} +``` + +**ZookeeperRegistry#doRegister** +```java +public void doRegister(URL url) { + try { + // 在zookeeper上注册Provider + // 目录:/dubbo/xxxService/providers/*** + // 数据:dubbo://192.168.31.167:20800/xxxService?anyhost=true& + // application=application-name&async=false&deprecated=false&dubbo=2.0.2& + // dynamic=true&file.cache=false&generic=false&interface=xxxService& + // metadata-type=remote&methods=hello&pid=82470&release=& + // service-name-mapping=true&side=provider×tamp=1629588251493 + zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true)); + } catch (Throwable e) { + throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); + } +} +``` + +* 订阅地址变更 + +核心代码在FailbackRegistry#subscribe和ZookeeperRegistry#doSubscribe中,具体如下: +**FailbackRegistry#subscribe** +```java +public void subscribe(URL url, NotifyListener listener) { + ...... + try { + // 调用ZookeeperRegistry#doSubscribe + doSubscribe(url, listener); + } catch (Exception e) { + ...... +} +``` + +**ZookeeperRegistry#doSubscribe** +```java +public void doSubscribe(final URL url, final NotifyListener listener) { + try { + if (ANY_VALUE.equals(url.getServiceInterface())) { + ...... + } else { + ...... + for (String path : toCategoriesPath(url)) { + ConcurrentMap listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>()); + ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, path, k, latch)); + if (zkListener instanceof RegistryChildListenerImpl) { + ((RegistryChildListenerImpl) zkListener).setLatch(latch); + } + // 创建临时节点用来存储configurators数据 + // 目录:/dubbo/xxxService/configurators + // 数据:应用的配置信息,可以在dubbo-admin中进行修改,默认为空 + zkClient.create(path, false); + // 添加监听器,用来监听configurators中的变化 + List children = zkClient.addChildListener(path, zkListener); + if (children != null) { + urls.addAll(toUrlsWithEmpty(url, path, children)); + } + } + ...... + } + } catch (Throwable e) { + ...... + } +} +``` + +* 建立暴露的Triple协议服务与Metadata之间的联系 + +核心代码在ServiceConfig#exportUrl、MetadataUtils#publishServiceDefinition、InMemoryWritableMetadataService#publishServiceDefinition、RemoteMetadataServiceImpl#publishServiceDefinition和MetadataReport#storeProviderMetadata中,具体如下: + +**ServiceConfig#exportUrl** +```java +private void exportUrl(URL url, List registryURLs) { + ...... + if (!SCOPE_NONE.equalsIgnoreCase(scope)) { + ...... + if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) { + url = exportRemote(url, registryURLs); + // 发布事件,更新服务接口相关的数据 + MetadataUtils.publishServiceDefinition(url); + } + } + ...... +} +``` + +**MetadataUtils#publishServiceDefinition** +```java +public static void publishServiceDefinition(URL url) { + // 将服务接口相关的数据存在到InMemoryWritableMetadataService中 + WritableMetadataService.getDefaultExtension().publishServiceDefinition(url); + // 将服务接口相关的数据存在到远端的元数据中心 + if (REMOTE_METADATA_STORAGE_TYPE.equalsIgnoreCase(url.getParameter(METADATA_KEY))) { + getRemoteMetadataService().publishServiceDefinition(url); + } +} +``` + +**InMemoryWritableMetadataService#publishServiceDefinition** +```java +public void publishServiceDefinition(URL url) { + try { + String interfaceName = url.getServiceInterface(); + if (StringUtils.isNotEmpty(interfaceName) + && !ProtocolUtils.isGeneric(url.getParameter(GENERIC_KEY))) { + Class interfaceClass = Class.forName(interfaceName); + ServiceDefinition serviceDefinition = ServiceDefinitionBuilder.build(interfaceClass); + Gson gson = new Gson(); + String data = gson.toJson(serviceDefinition); + // 存储服务接口相关数据 + // 数据格式: + // { + // "canonicalName": "xxxService", + // "codeSource": "file:/Users/xxxx", + // "methods": [{ + // "name": "hello", + // "parameterTypes": ["java.lang.String"], + // "returnType": "java.lang.String", + // "annotations": [] + // }], + // "types": [{ + // "type": "java.lang.String" + // }], + // "annotations": [] + // } + serviceDefinitions.put(url.getServiceKey(), data); + return; + } else if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) { + ...... + } + ...... + } catch (Throwable e) { + ...... + } +} +``` + +**RemoteMetadataServiceImpl#publishServiceDefinition** + +```java +public void publishServiceDefinition(URL url) { + checkRemoteConfigured(); + String side = url.getSide(); + if (PROVIDER_SIDE.equalsIgnoreCase(side)) { + // 发布服务端(Provider)的服务接口信息到元数据中心 + publishProvider(url); + } else { + ...... + } +} +``` + +**RemoteMetadataServiceImpl#publishProvider** + +```java +private void publishProvider(URL providerUrl) throws RpcException { + ...... + try { + String interfaceName = providerUrl.getServiceInterface(); + if (StringUtils.isNotEmpty(interfaceName)) { + ...... + for (Map.Entry entry : getMetadataReports().entrySet()) { + // 获取MetadataReport服务,该服务用来访问元数据中心 + MetadataReport metadataReport = entry.getValue(); + // 将服务接口信息存储到元数据中心 + metadataReport.storeProviderMetadata(new MetadataIdentifier(providerUrl.getServiceInterface(), + providerUrl.getVersion(), providerUrl.getGroup(), + PROVIDER_SIDE, providerUrl.getApplication()), fullServiceDefinition); + } + return; + } + ...... + } catch (ClassNotFoundException e) { + ...... + } +} +``` + +**AbstractMetadataReport#storeProviderMetadata** + +```java +public void storeProviderMetadata(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition){ + if (syncReport) { + storeProviderMetadataTask(providerMetadataIdentifier, serviceDefinition); + } else { + // 异步存储到元数据中心 + reportCacheExecutor.execute(() -> storeProviderMetadataTask(providerMetadataIdentifier, serviceDefinition)); + } +} + +private void storeProviderMetadataTask(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) { + try { + ...... + allMetadataReports.put(providerMetadataIdentifier, serviceDefinition); + failedReports.remove(providerMetadataIdentifier); + Gson gson = new Gson(); + // data的数据格式: + // { + // "parameters": { + // "side": "provider", + // "interface": "xxxService", + // "metadata-type": "remote", + // "service-name-mapping": "true", + // }, + // "canonicalName": "xxxService", + // "codeSource": "file:/Users/xxxx", + // "methods": [{ + // "name": "hello", + // "parameterTypes": ["java.lang.String"], + // "returnType": "java.lang.String", + // "annotations": [] + // }], + // "types": [{ + // "type": "java.lang.String" + // }], + // "annotations": [] + // } + String data = gson.toJson(serviceDefinition); + // 存储到元数据中心,实例中的元数据中心是ZookeeperMetadataReport + // 目录:元数据中心Metadata-report的/dubbo/metadata/xxxService/provider/${application-name}节点下 + doStoreProviderMetadata(providerMetadataIdentifier, data); + // 存储到本地文件 + // 路径:xxxService:::provider:${application-name}  + saveProperties(providerMetadataIdentifier, data, true, !syncReport); + } catch (Exception e) { + ...... + } +} +``` + +* 建立Triple协议服务与MetadataReport服务之间的关系 + +核心代码在ServiceConfig#exported、MetadataServiceNameMapping#map和ZookeeperMetadataReport#registerServiceAppMapping中,具体如下: + +**ServiceConfig#exported** + +```java +protected void exported() { + exported = true; + List exportedURLs = this.getExportedUrls(); + exportedURLs.forEach(url -> { + // 判断URL中是否标记有service-name-mapping的字段 + // 标记有该字段的服务是需要将暴露的服务与元数据中心关联起来 + // Consumer可以通过元数据中心的消息变更感知到Provider端元数据的变更 + if (url.getParameters().containsKey(SERVICE_NAME_MAPPING_KEY)) { + ServiceNameMapping serviceNameMapping = ServiceNameMapping.getDefaultExtension(); + // 建立关系 + serviceNameMapping.map(url); + } + }); + onExported(); +} +``` + +**MetadataServiceNameMapping#map** + +```java +public void map(URL url) { + execute(() -> { + String registryCluster = getRegistryCluster(url); + // 获取MetadataReport,也就是元数据中心的访问路径 + MetadataReport metadataReport = MetadataReportInstance.getMetadataReport(registryCluster); + ...... + int currentRetryTimes = 1; + boolean success; + String newConfigContent = getName(); + do { + // 获取元数据中心中存储的应用的版本信息 + ConfigItem configItem = metadataReport.getConfigItem(serviceInterface, DEFAULT_MAPPING_GROUP); + String oldConfigContent = configItem.getContent(); + if (StringUtils.isNotEmpty(oldConfigContent)) { + boolean contains = StringUtils.isContains(oldConfigContent, getName()); + if (contains) { + break; + } + newConfigContent = oldConfigContent + COMMA_SEPARATOR + getName(); + } + // 在元数据中心创建mapping节点,并将暴露的服务数据存到元数据中心,这里的元数据中心用zookeeper实现的 + // 目录:/dubbo/mapping/xxxService + // 数据:configItem.content为${application-name},configItem.ticket为版本好 + success = metadataReport.registerServiceAppMapping(serviceInterface, DEFAULT_MAPPING_GROUP, newConfigContent, configItem.getTicket()); + } while (!success && currentRetryTimes++ <= CAS_RETRY_TIMES); + }); +} +``` + +**ZookeeperMetadataReport#registerServiceAppMapping** + +```java +public boolean registerServiceAppMapping(String key, String group, String content, Object ticket) { + try { + if (ticket != null && !(ticket instanceof Stat)) { + throw new IllegalArgumentException("zookeeper publishConfigCas requires stat type ticket"); + } + String pathKey = buildPathKey(group, key); + // 1. 创建/dubbo/mapping/xxxService目录,存储的数据为configItem + // 2. 生成版本号 + zkClient.createOrUpdate(pathKey, content, false, ticket == null ? 0 : ((Stat) ticket).getVersion()); + return true; + } catch (Exception e) { + logger.warn("zookeeper publishConfigCas failed.", e); + return false; + } +} +``` + +到这里,暴露Triple协议的服务并注册registry协议的流程就结束了。我们可以看出,这一部分的逻辑还是挺复杂的。主要是将以前接口级服务发现机制中注册到注册中心中的数据(应用实例数据+服务接口数据)拆分出来了。注册registry协议部分将应用实例数据注册到注册中心,在Exporter暴露完以后通过调用MetadataUtils#publishServiceDefinition将服务接口数据注册到元数据中心。 + +##### 暴露MetadataService服务 + +MetadataService主要是对Consumer侧提供一个可以获取元数据的API,暴露流程是复用了Triple协议的服务暴露的流程 + +**整体时序图** +![暴露MetadataService服务](pic/暴露MetadataService服务.svg) + +**核心代码** +* 暴露MetadataService的入口 + +核心代码在DubboBootstrap#exportMetadataService中,具体如下: +```java +private void exportMetadataService() { + // 暴露MetadataServer + metadataServiceExporter.export(); +} +``` + +* 暴露MetadataService + +核心代码在ConfigurableMetadataServiceExporter#export中,具体如下: + +```java +public ConfigurableMetadataServiceExporter export() { + + if (!isExported()) { + // 定义MetadataService的ServiceConfig + ServiceConfig serviceConfig = new ServiceConfig<>(); + serviceConfig.setApplication(getApplicationConfig()); + // 不会注册到注册中心 + serviceConfig.setRegistry(new RegistryConfig("N/A")); + serviceConfig.setProtocol(generateMetadataProtocol()); + serviceConfig.setInterface(MetadataService.class); + serviceConfig.setDelay(0); + serviceConfig.setRef(metadataService); + serviceConfig.setGroup(getApplicationConfig().getName()); + serviceConfig.setVersion(metadataService.version()); + serviceConfig.setMethods(generateMethodConfig()); + // 用暴露Triple协议服务的流程来暴露MetadataService + // 采用的是Dubbo协议 + serviceConfig.export(); + this.serviceConfig = serviceConfig; + } + return this; +} +``` + +由于暴露MetadataService的流程是复用前面提到的暴露Triple协议服务的流程,整个过程有少许地方会不同,这些不同之处在上面的代码中都已经标明,所以就不再赘述了。 + +* 注册ServiceInstance实例 + +注册注册ServiceInstance的目的是为了定时更新Metadata,当有更新的时候就会通过MetadataReport来更新版本号让Consumer端感知到。 +核心代码在DubboBootstrap#registerServiceInstance和DubboBootstrap#doRegisterServiceInstance中,具体如下: + +**DubboBootstrap#registerServiceInstance** +```java +private void registerServiceInstance() { + .... + // 创建ServiceInstance + // ServiceInstance中包含以下字段 + // 1. serviceName:${application-name} + // 2. host: 192.168.31.167 + // 3. port: 2080 + // 4. metadata: 服务接口级相关的数据,比如:methods等数据 + // 同时,还会对ServiceInstance数据中的字段进行补充,分别调用下面4个ServiceInstanceCustomizer实例 + // 1)ServiceInstanceMetadataCustomizer + // 2)MetadataServiceURLParamsMetadataCustomizer + // 3)ProtocolPortsMetadataCustomizer + // 4)ServiceInstanceHostPortCustomizer + ServiceInstance serviceInstance = createServiceInstance(serviceName); + boolean registered = true; + try { + // 注册ServiceInstance + doRegisterServiceInstance(serviceInstance); + } catch (Exception e) { + registered = false; + logger.error("Register instance error", e); + } + // 如果注册成功,定时更新Metadata,没10s更新一次 + if(registered){ + executorRepository.nextScheduledExecutor().scheduleAtFixedRate(() -> { + ...... + try { + // 刷新Metadata和ServiceInstance + ServiceInstanceMetadataUtils.refreshMetadataAndInstance(serviceInstance); + } catch (Exception e) { + ...... + } finally { + ...... + } + }, 0, ConfigurationUtils.get(METADATA_PUBLISH_DELAY_KEY, DEFAULT_METADATA_PUBLISH_DELAY), TimeUnit.MILLISECONDS); + } +} +``` + +**DubboBootstrap#doRegisterServiceInstance** + +```java +private void doRegisterServiceInstance(ServiceInstance serviceInstance) { + if (serviceInstance.getPort() > 0) { + // 发布Metadata数据到远端存储元数据中心 + // 调用RemoteMetadataServiceImpl#publishMetadata, + // 内部会调用metadataReport#publishAppMetadata + publishMetadataToRemote(serviceInstance); + logger.info("Start registering instance address to registry."); + getServiceDiscoveries().forEach(serviceDiscovery ->{ + ServiceInstance serviceInstanceForRegistry = new DefaultServiceInstance((DefaultServiceInstance) serviceInstance); + calInstanceRevision(serviceDiscovery, serviceInstanceForRegistry); + ...... + // 调用ZookeeperServiceDiscovery#doRegister注册serviceInstance实例 + // 将应用服务信息注册到注册中心中 + // 目录:/services/${application-name}/192.168.31.167:20800 + // 数据:serviceInstance序列化后的byte数组 + serviceDiscovery.register(serviceInstanceForRegistry); + }); + } +} +``` + +通过上面对分析,我们可以很容易知道 + +* ServiceInstance是中包含Metadata +* Metadata是存储在InMemoryWritableMetadataService中的元数据,占用的是本地内存空间 +* InMemoryWritableMetadataService用来更新Metadata +* ServiceInstance是存储在远端元数据注册中心中的数据结构 +* RemoteMetadataServiceImpl会调用metadataReport将ServiceInstance数据更新到远端元数据注册中心 + +##### 总结 +通过对Dubbo3.0服务端暴露全流程的解析可以看出,尽管应用级服务发现机制的实现要复杂很多,但是Dubbo3.0为了能够让使用者平滑迁移,兼容了2.7.x的版本,所以在设计的时候很多地方都尽可能复用之前的流程。 +从最近Dubbo3.0发布的Benchmark数据来看,Dubbo3.0的性能和资源利用上确实提升了不少。Dubbo3.0在拥抱云原生的道路上还有很长的一段路要走,社区正在对Dubbo3.0中核心流程进行梳理和优化,后续计划支持多实例应用部署,希望有兴趣见证Dubbo云原生之路的同学可以积极参与社区贡献! + +##### 作者 +熊聘,Github账号pinxiong,Apache Dubbo贡献者,关注RPC、Service Mesh和云原生等领域。现任职于携程国际事业部研发团队,负责市场营销、云原生等相关工作。 + + +##### 参考 +* [Dubbo迈出云原生重要一步-应用级服务发现解析](https://mp.weixin.qq.com/s/m26_VnEwLSFIlscyEU_pTg) +* [资源利用率提升达75%,Dubbo 3.0 Benchmark 发布](https://mp.weixin.qq.com/s/vM1MtNZ2n0zm5tKc90D3ag) \ No newline at end of file