在上一章中,我分析 MergeableClusterInvoker 的具体实现时,讲解过这样的内容:MergeableClusterInvoker 会读取 URL 中的 merger
参数值,如果 merger
参数以 "." 开头,则表示 "." 后的内容是一个方法名,这个方法名是远程目标方法的返回类型中的一个方法,MergeableClusterInvoker 在拿到所有 Invoker 返回的结果对象之后,会遍历每个返回结果,并调用 merger
参数指定的方法,合并这些结果值。
其实,除了上述指定 Merger 方法名称的合并方式之外,Dubbo 内部还提供了很多默认的 Merger 实现,这也就是本章将要分析的内容。本章我将详细介绍 MergerFactory 工厂类、Merger 接口以及针对 Java 常见数据类型的 Merger 实现。
一、Merger
Merger是一个 @SPI
注解的扩展接口:
// Merger.java
@SPI
public interface Merger<T> {
T merge(T... items);
}
Merger接口的类继承关系如下图,Java的常见数据类型都有对应的Merger实现:
1.1 MergerFactory
在 MergeableClusterInvoker 使用默认 Merger 实现的时候,会通过 MergerFactory 以及服务接口返回值类型选择合适的 Merger 实现。在 MergerFactory 中维护了一个 ConcurrentHashMap 集合,用来缓存服务接口返回值类型与 Merger 实例之间的映射关系。
MergerFactory.getMerger() 方法会根据传入的 returnType
类型,从缓存中查找相应的 Merger 实现,下面我们来看该方法的具体实现:
// MergerFactory.java
public class MergerFactory {
private static final ConcurrentMap<Class<?>, Merger<?>> MERGER_CACHE =
new ConcurrentHashMap<Class<?>, Merger<?>>();
public static <T> Merger<T> getMerger(Class<T> returnType) {
// returnType为空,直接抛出异常
if (returnType == null) {
throw new IllegalArgumentException("returnType is null");
}
Merger result;
// returnType为数组类型
if (returnType.isArray()) {
// 获取数组中元素的类型
Class type = returnType.getComponentType();
// 获取元素类型对应的Merger实现
result = MERGER_CACHE.get(type);
if (result == null) {
loadMergers();
result = MERGER_CACHE.get(type);
}
// 如果Dubbo没有提供元素类型对应的Merger实现,则返回ArrayMerger
if (result == null && !type.isPrimitive()) {
result = ArrayMerger.INSTANCE;
}
} else {
// 如果returnType不是数组类型,则直接从MERGER_CACHE缓存查找对应的Merger实例
result = MERGER_CACHE.get(returnType);
if (result == null) {
loadMergers();
result = MERGER_CACHE.get(returnType);
}
}
return result;
}
static void loadMergers() {
// 获取Merger接口的所有扩展名称
Set<String> names = ExtensionLoader.getExtensionLoader(Merger.class)
.getSupportedExtensions();
// 遍历所有Merger扩展实现
for (String name : names) {
Merger m = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(name);
// 将Merger实例与对应returnType的映射关系记录到MERGER_CACHE集合中
MERGER_CACHE.putIfAbsent(ReflectUtils.getGenericClass(m.getClass()), m);
}
}
}
上述的loadMergers() 方法会通过 Dubbo SPI 方式加载 Merger 接口全部扩展实现的名称,并填充到 MERGER_CACHE
集合中
二、内置Merger
前面说了,Java的常见数据类型都有对应的Merger实现,其中不仅有处理 boolean[]、byte[]、char[]、double[]、float[]、int[]、long[]、short[] 等 基础类型数组 的 Merger 实现,还有处理 List、Set、Map 等 集合类 的 Merger 实现。本节我就对几个Dubbo中内置的Merger进行分析。
2.1 ArrayMerger
当服务接口的返回值为数组的时候,会使用 ArrayMerger 将多个数组合并成一个数组,也就是将二维数组拍平成一维数组。ArrayMerger.merge() 方法的具体实现如下:
// ArrayMerger.java
public class ArrayMerger implements Merger<Object[]> {
public static final ArrayMerger INSTANCE = new ArrayMerger();
@Override
public Object[] merge(Object[]... items) {
// 传入的结果集合为空,则直接返回空数组
if (ArrayUtils.isEmpty(items)) {
return new Object[0];
}
// 查找第一个不为null的结果
int i = 0;
while (i < items.length && items[i] == null) {
i++;
}
// 所有items数组中全部结果都为null,则直接返回空数组
if (i == items.length) {
return new Object[0];
}
Class<?> type = items[i].getClass().getComponentType();
int totalLen = 0;
for (; i < items.length; i++) {
// 忽略为null的结果
if (items[i] == null) {
continue;
}
Class<?> itemType = items[i].getClass().getComponentType();
// 保证类型相同
if (itemType != type) {
throw new IllegalArgumentException("Arguments' types are different");
}
totalLen += items[i].length;
}
// 确定最终数组的长度
if (totalLen == 0) {
return new Object[0];
}
Object result = Array.newInstance(type, totalLen);
// 遍历全部的结果数组,将items二维数组中的每个元素都加到result中,形成一维数组
int index = 0;
for (Object[] array : items) {
if (array != null) {
for (int j = 0; j < array.length; j++) {
Array.set(result, index++, array[j]);
}
}
}
return (Object[]) result;
}
}
2.2 IntArrayMerger
其他基础数据类型数组的 Merger 实现,与 ArrayMerger 的实现非常类似,都是将相应类型的二维数组拍平成同类型的一维数组,这里以 IntArrayMerger 为例进行分析:
// IntArrayMerger.java
public class IntArrayMerger implements Merger<int[]> {
@Override
public int[] merge(int[]... items) {
if (ArrayUtils.isEmpty(items)) {
// 检测传入的多个int[]不能为空
return new int[0];
}
// 直接使用Stream的API将多个int[]数组拍平成一个int[]数组
return Arrays.stream(items).filter(Objects::nonNull)
.flatMapToInt(Arrays::stream)
.toArray();
}
}
剩余的其他基础类型的 Merger 实现类,例如,FloatArrayMerger、IntArrayMerger、LongArrayMerger、BooleanArrayMerger、ByteArrayMerger、CharArrayMerger、DoubleArrayMerger 等,这里就不再赘述,你若感兴趣的话可以参考源码进行学习。
2.3 MapMerger
SetMerger、ListMerger 和 MapMerger 是针对 Set 、List 和 Map 返回值的 Merger 实现,它们会将多个 Set(或 List、Map)集合合并成一个 Set(或 List、Map)集合,核心原理与 ArrayMerger 的实现类似。这里我简单介绍下 MapMerger 的核心实现:
// MapMerger.java
public class MapMerger implements Merger<Map<?, ?>> {
@Override
public Map<?, ?> merge(Map<?, ?>... items) {
if (ArrayUtils.isEmpty(items)) {
// 空结果集时,这就返回空Map
return Collections.emptyMap();
}
// 将items中所有Map集合中的KV,添加到result这一个Map集合中
Map<Object, Object> result = new HashMap<Object, Object>();
Stream.of(items).filter(Objects::nonNull).forEach(result::putAll);
return result;
}
}
2.4 SetMerger
接下来再看 SetMerger 和 ListMerger 的核心实现:
// SetMerger.java
public class SetMerger implements Merger<Set<?>> {
@Override
public Set<Object> merge(Set<?>... items) {
if (ArrayUtils.isEmpty(items)) {
return Collections.emptySet();
}
Set<Object> result = new HashSet<Object>();
Stream.of(items).filter(Objects::nonNull).forEach(result::addAll);
return result;
}
}
// ListMerger.java
public class ListMerger implements Merger<List<?>> {
@Override
public List<Object> merge(List<?>... items) {
if (ArrayUtils.isEmpty(items)) {
return Collections.emptyList();
}
// 通过Stream API将传入的所有List集合拍平成一个List集合并返回
return Stream.of(items).filter(Objects::nonNull)
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
}
三、自定义 Merger
我们可以尝试写一个自己的 Merger 实现,这里我以 dubbo-demo-xml
中的 Provider 和 Consumer 为例进行修改。
首先我们在 dubbo-demo-xml-provider
示例模块中发布两个服务,分别属于 groupA 和 groupB,相应的 dubbo-provider.xml
配置如下:
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
<dubbo:application metadata-type="remote" name="demo-provider"/>
<dubbo:metadata-report address="zookeeper://127.0.0.1:2181"/>
<dubbo:registry address="zookeeper://127.0.0.1:2181"/>
<dubbo:protocol name="dubbo"/>
<!-- 配置两个Spring Bean -->
<bean id="demoService" class="org.apache.dubbo.demo.provider.DemoServiceImpl"/>
<bean id="demoServiceB" class="org.apache.dubbo.demo.provider.DemoServiceImpl"/>
<!-- 将demoService和demoServiceB两个Spring Bean作为服务发布出去,分别属于groupA和groupB-->
<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" group="groupA"/>
<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoServiceB" group="groupB"/>
</beans>
接下来,在 dubbo-demo-xml-consumer
示例模块中进行服务引用,dubbo-consumer.xml
配置文件的具体内容如下:
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
<dubbo:application name="demo-consumer"/>
<dubbo:registry address="zookeeper://127.0.0.1:2181"/>
<!-- 引用DemoService,这里指定了group为*,即可以引用任何group的Provider,同时merger设置为true,即需要对结果进行合并-->
<dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService" group="*" merger="true"/>
</beans>
然后,在 dubbo-demo-xml-consumer
示例模块的 /resources/META-INF/dubbo
目录下,添加一个名为 org.apache.dubbo.rpc.cluster.Merger
的 Dubbo SPI 配置文件,其内容如下:
String = org.apache.dubbo.demo.consumer.StringMerger
StringMerger 实现了前面介绍的 Merger 接口,它 会将多个 Provider 节点返回的 String 结果值拼接起来 ,具体实现如下:
public class StringMerger implements Merger<String> {
@Override
public String merge(String... items) {
// 检测空返回值
if (ArrayUtils.isEmpty(items)) {
return "";
}
// 通过竖线将多个Provider的返回值拼接起来
String result = "";
for (String item : items) {
result += item + "|";
}
return result;
}
}
最后,依次启动 Zookeeper、dubbo-demo-xml-provider
示例模块和 dubbo-demo-xml-consumer
示例模块。在控制台中会看到如下输出:
result: Hello world, response from provider: 172.17.108.179:20880|Hello world, response from provider: 172.17.108.179:20880|
四、总结
本章,我重点介绍了 MergeableCluster 中涉及的 Merger 合并器相关的知识点。
- 首先,我介绍了 MergerFactory 工厂类的核心功能,它可以配合远程方法调用的返回值,选择对应的 Merger 实现,完成结果的合并;
- 然后,我深入分析了 Dubbo 自带的 Merger 实现类,涉及 Java 中各个基础类型数组的 Merger 合并器实现,例如,IntArrayMerger、LongArrayMerger 等,它们都是将多个特定类型的一维数组拍平成相同类型的一维数组。除了这些基础类型数组的 Merger 实现,Dubbo 还提供了 List、Set、Map 等集合类的 Merger 实现,它们的核心是将多个集合中的元素整理到一个同类型的集合中。
- 最后,我以 StringMerger 为例,介绍了如何自定义 Merger 合并器。