Apache Dubbo介绍
Apache Dubbo 是一款高性能、轻量级的开源Java RPC框架,它提供了三大核心能力:面向接口的远程方法调用,智能容错和负载均衡,以及服务自动注册和发现。
简介
Apache Dubbo ˈdʌbəʊ is a high-performance, java based RPC framework open-sourced by Alibaba. As in many RPC systems, dubbo is based around the idea of defining a service, specifying the methods that can be called remotely with their parameters and return types. On the server side, the server implements this interface and runs a dubbo server to handle client calls. On the client side, the client has a stub that provides the same methods as the server.
翻译:
Apache Dubbo作为一种高性能、基于Java的RPC框架,是由阿里巴巴开放的。与许多RPC系统一样,Dubbo是基于定义服务的想法,指定可以远程调用其参数和返回类型的方法。在服务器端,服务器实现此接口并运行DUBBO服务器来处理客户端调用。在客户端上,客户端具有提供与服务器相同的方法的存根。
特性
- 面向接口代理的高性能RPC调用:提供高性能的基于代理的远程调用能力,服务以接口为粒度,为开发者屏蔽远程调用底层细节。
- 智能负载均衡:内置多种负载均衡策略,智能感知下游节点健康状况,显著减少调用延迟,提高系统吞吐量。
- 服务自动注册与发现:支持多种注册中心服务,服务实例上下线实时感知。
- 高度可扩展能力:遵循微内核+插件的设计原则,所有核心能力如Protocol、Transport、Serialization被设计为扩展点,平等对待内置实现和第三方实现。
- 运行期流量调度:内置条件、脚本等路由策略,通过配置不同的路由规则,轻松实现灰度发布,同机房优先等功能。
- 可视化的服务治理与运维:提供丰富服务治理、运维工具:随时查询服务元数据、服务健康状态及调用统计,实时下发路由策略、调整配置参数。
整体架构
说明
- config 配置层:对外配置接口,以 ServiceConfig, ReferenceConfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类;
- proxy 服务代理层:服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,扩展接口为 ProxyFactory;
- registry 注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService;
- cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 Cluster, Directory, Router, LoadBalance;
- monitor 监控层:RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为 MonitorFactory, Monitor, MonitorService;
- protocol 远程调用层:封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter;
- exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer;
- transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec;
- serialize 数据序列化层:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool;
模块分包
模块说明:
- dubbo-common 公共逻辑模块:包括 Util 类和通用模型。
- dubbo-remoting 远程通讯模块:相当于 Dubbo 协议的实现,如果 RPC 用 RMI协议则不需要使用此包。
- dubbo-rpc 远程调用模块:抽象各种协议,以及动态代理,只包含一对一的调用,不关心集群的管理。
- dubbo-cluster 集群模块:将多个服务提供方伪装为一个提供方,包括:负载均衡, 容错,路由等,集群的地址列表可以是静态配置的,也可以是由注册中心下发。
- dubbo-registry注册中心模块:基于注册中心下发地址的集群方式,以及对各种注册中心的抽象。
- dubbo-monitor 监控模块:统计服务调用次数,调用时间的,调用链跟踪的服务。
- dubbo-config 配置模块:是 Dubbo 对外的 API,用户通过 Config 使用Dubbo,隐藏 Dubbo 所有细节。
- dubbo-container 容器模块:是一个 Standlone 的容器,以简单的 Main 加载 Spring 启动,因为服务通常不需要 Tomcat/JBoss 等 Web 容器的特性,没必要用 Web 容器去加载服务。
调用链
领域模型
-
Protocol 是服务域,它是 Invoker 暴露和引用的主功能入口,它负责 Invoker 的生命周期管理。
-
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
public interface Protocol { /** * Get default port when user doesn't config the port. * * @return default port */ int getDefaultPort(); /** * Export service for remote invocation: <br>暴露远程服务 * 1. Protocol should record request source address after receive a request: * RpcContext.getContext().setRemoteAddress();<br>协议在接收请求时,应记录请求来源方地址信息 * 2. export() must be idempotent, that is, there's no difference between invoking once and invoking twice when * export the same URL<br>必须是幂等的,也就是暴露同一个 URL 的 Invoker 两次,和暴露一次没有区别 * 3. Invoker instance is passed in by the framework, protocol needs not to care <br>传入的 Invoker 由框架实现并传入,协议不需要关心 * * @param <T> Service type 服务的类型 * @param invoker Service invoker 服务的执行体 * @return exporter reference for exported service, useful for unexport the service later 暴露服务的引用,用于取消暴露 * @throws RpcException thrown when error occurs during export the service, for example: port is occupied当暴露服务出错时抛出,比如端口已占用 */ @Adaptive <T> Exporter<T> export(Invoker<T> invoker) throws RpcException; /** * Refer a remote service: <br> 引用远程服务 * 1. When user calls `invoke()` method of `Invoker` object which's returned from `refer()` call, the protocol * needs to correspondingly execute `invoke()` method of `Invoker` object <br>当用户调用 refer() 所返回的 Invoker 对象的 invoke() 方法时,协议需相应执行同 URL 远端 export() 传入的 Invoker 对象的 invoke() 方法 * 2. It's protocol's responsibility to implement `Invoker` which's returned from `refer()`. Generally speaking, * protocol sends remote request in the `Invoker` implementation. <br>refer() 返回的 Invoker 由协议实现,协议通常需要在此 Invoker 中发送远程请求 * 3. When there's check=false set in URL, the implementation must not throw exception but try to recover when * connection fails.当 url 中有设置 check=false 时,连接失败不能抛出异常,并内部自动恢复 * * @param <T> Service type 服务的类型 * @param type Service class 服务的类型 * @param url URL address for the remote service 远程服务的URL地址 * @return invoker service's local proxy 服务的本地代理 * @throws RpcException when there's any error while connecting to the service provider 当连接服务提供方失败时抛出 */ @Adaptive <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException; /** * Destroy protocol: <br>释放协议 * 1. Cancel all services this protocol exports and refers <br>取消该协议所有已经暴露和引用的服务 * 2. Release all occupied resources, for example: connection, port, etc. <br>释放协议所占用的所有资源,比如连接和端口 * 3. Protocol can continue to export and refer new service even after it's destroyed.协议在释放后,依然能暴露和引用新的服务 */ void destroy(); /** * Get all servers serving this protocol * * @return */ default List<ProtocolServer> getServers() { return Collections.emptyList(); } }
-
-
Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。
-
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
public interface Invoker<T> extends Node { /** * get service interface. * 获得Service接口 * @return service interface. */ Class<T> getInterface(); /** * invoke. * 方法调用 * @param invocation * @return result * @throws RpcException */ Result invoke(Invocation invocation) throws RpcException; }
-
-
Invocation 是会话域,它持有调用过程中的变量,比如方法名,参数等。
-
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
public interface Invocation { String getTargetServiceUniqueName(); String getProtocolServiceKey(); /** * get method name. * 获取方法名 * @return method name. * @serial */ String getMethodName(); /** * get the interface name * 获取接口名称 * @return */ String getServiceName(); /** * get parameter types. * 获得方法参数类型数组 * @return parameter types. * @serial */ Class<?>[] getParameterTypes(); /** * get parameter's signature, string representation of parameter types. * * @return parameter's signature */ default String[] getCompatibleParamSignatures() { return Stream.of(getParameterTypes()) .map(Class::getName) .toArray(String[]::new); } /** * get arguments. * 获得方法参数数组 * @return arguments. * @serial */ Object[] getArguments(); /** * get attachments. * 获得隐式参数相关 * @return attachments. * @serial */ Map<String, String> getAttachments(); @Experimental("Experiment api for supporting Object transmission") Map<String, Object> getObjectAttachments(); void setAttachment(String key, String value); @Experimental("Experiment api for supporting Object transmission") void setAttachment(String key, Object value); @Experimental("Experiment api for supporting Object transmission") void setObjectAttachment(String key, Object value); void setAttachmentIfAbsent(String key, String value); @Experimental("Experiment api for supporting Object transmission") void setAttachmentIfAbsent(String key, Object value); @Experimental("Experiment api for supporting Object transmission") void setObjectAttachmentIfAbsent(String key, Object value); /** * get attachment by key. * * @return attachment value. * @serial */ String getAttachment(String key); @Experimental("Experiment api for supporting Object transmission") Object getObjectAttachment(String key); /** * get attachment by key with default value. * * @return attachment value. * @serial */ String getAttachment(String key, String defaultValue); @Experimental("Experiment api for supporting Object transmission") Object getObjectAttachment(String key, Object defaultValue); /** * get the invoker in current context. * 获得对应的 Invoker 对象 * @return invoker. * @transient */ Invoker<?> getInvoker(); Object put(Object key, Object value); Object get(Object key); Map<Object, Object> getAttributes(); }
-
-
Result 是会话域,它持有调用过程中返回值,异常等。
-
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
public interface Result extends Serializable { /** * Get invoke result. * 获得返回值 * @return result. if no result return null. */ Object getValue(); void setValue(Object value); /** * Get exception. * 获得返回的异常 * @return exception. if no exception return null. */ Throwable getException(); void setException(Throwable t); /** * Has exception. * 是否有异常 * @return has exception. */ boolean hasException(); /** * Recreate. * <p> * <code> * if (hasException()) { * throw getException(); * } else { * return getValue(); * } * </code> * * @return result. * @throws if has exception throw it. */ Object recreate() throws Throwable; /** * get attachments. * 获得返回的隐式参数相关 * @return attachments. */ Map<String, String> getAttachments(); /** * get attachments. * * @return attachments. */ @Experimental("Experiment api for supporting Object transmission") Map<String, Object> getObjectAttachments(); /** * Add the specified map to existing attachments in this instance. * * @param map */ void addAttachments(Map<String, String> map); /** * Add the specified map to existing attachments in this instance. * * @param map */ @Experimental("Experiment api for supporting Object transmission") void addObjectAttachments(Map<String, Object> map); /** * Replace the existing attachments with the specified param. * * @param map */ void setAttachments(Map<String, String> map); /** * Replace the existing attachments with the specified param. * * @param map */ @Experimental("Experiment api for supporting Object transmission") void setObjectAttachments(Map<String, Object> map); /** * get attachment by key. * * @return attachment value. */ String getAttachment(String key); /** * get attachment by key. * * @return attachment value. */ @Experimental("Experiment api for supporting Object transmission") Object getObjectAttachment(String key); /** * get attachment by key with default value. * * @return attachment value. */ String getAttachment(String key, String defaultValue); /** * get attachment by key with default value. * * @return attachment value. */ @Experimental("Experiment api for supporting Object transmission") Object getObjectAttachment(String key, Object defaultValue); void setAttachment(String key, String value); @Experimental("Experiment api for supporting Object transmission") void setAttachment(String key, Object value); @Experimental("Experiment api for supporting Object transmission") void setObjectAttachment(String key, Object value); /** * Add a callback which can be triggered when the RPC call finishes. * <p> * Just as the method name implies, this method will guarantee the callback being triggered under the same context as when the call was started, * see implementation in {@link Result#whenCompleteWithContext(BiConsumer)} * * @param fn * @return */ Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn); <U> CompletableFuture<U> thenApply(Function<Result, ? extends U> fn); Result get() throws InterruptedException, ExecutionException; Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
-
-
Filter过滤器接口。
-
1 2 3 4 5 6 7 8 9 10 11 12 13
@SPI public interface Filter { /** * Make sure call invoker.invoke() in your implementation.执行 Invoker 的过滤逻辑 */ Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException; interface Listener { void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation); void onError(Throwable t, Invoker<?> invoker, Invocation invocation); } }
-
-
ProxyFactory代理工厂接口
- JavassistProxyFactory
- JdkProxyFactory
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
@SPI("javassist") public interface ProxyFactory { /** * create proxy. * 创建 Proxy ,在引用服务调用。 * @param invoker * @return proxy */ @Adaptive({PROXY_KEY}) <T> T getProxy(Invoker<T> invoker) throws RpcException; /** * create proxy. * 创建 Invoker ,在暴露服务时调用 * @param invoker * @return proxy */ @Adaptive({PROXY_KEY}) <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException; /** * create invoker. * * @param <T> * @param proxy Service 对象 * @param type Service 接口类型 * @param url Service 对应的 Dubbo URL * @return invoker */ @Adaptive({PROXY_KEY}) <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException; }
-
Exporter ,Invoker 暴露服务在 Protocol 上的对象。
-
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
public interface Exporter<T> { /** * get invoker.获得对应的 Invoker * * @return invoker */ Invoker<T> getInvoker(); /** * unexport.取消暴露 * <p> * <code> * getInvoker().destroy(); * </code> */ void unexport(); }
-
-
InvokerListener,Invoker 监听器。
-
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
@SPI public interface InvokerListener { /** * The invoker referred 当服务引用完成 * * @param invoker * @throws RpcException * @see org.apache.dubbo.rpc.Protocol#refer(Class, org.apache.dubbo.common.URL) */ void referred(Invoker<?> invoker) throws RpcException; /** * The invoker destroyed. 当服务销毁引用完成 * * @param invoker * @see org.apache.dubbo.rpc.Invoker#destroy() */ void destroyed(Invoker<?> invoker); }
-
-
ExporterListener ,Exporter 监听器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
@SPI public interface ExporterListener { /** * The exporter exported.当服务暴露完成 * * @param exporter * @throws RpcException * @see org.apache.dubbo.rpc.Protocol#export(Invoker) */ void exported(Exporter<?> exporter) throws RpcException; /** * The exporter unexported.当服务取消暴露完成 * * @param exporter * @throws RpcException * @see org.apache.dubbo.rpc.Exporter#unexport() */ void unexported(Exporter<?> exporter); }