2023-08-16
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/372

通过前面章节对 DubboProtocol 的讲解,我们知道:

  • 对于服务暴露:上层业务 Bean 会被封装成 Invoker 对象,然后传入 DubboProtocol.export() 方法中,Invoker 会被封装成 DubboExporter,并缓存到 exporterMap 集合中。在 DubboProtocol 暴露的 ProtocolServer 收到请求时,经过一系列解码处理,最终会到达 DubboProtocol.requestHandler 这个 ExchangeHandler 对象中,该 ExchangeHandler 对象会从 exporterMap 集合中取出 Invoker,并调用其 invoke() 方法处理请求;
  • 对于服务引用:DubboProtocol.protocolBindingRefer() 方法会将底层的 ExchangeClient 集合封装成 DubboInvoker,然后由上层逻辑封装成代理对象,这样业务层就可以像调用本地 Bean 一样,完成远程调用。

本章,我就对 Invoker 进行深入分析,其类继承图如下:

202308162142274701.png

一、AbstractInvoker

首先,我们来看 AbstractInvoker 这个抽象类。

1.1 核心字段

AbstractInvoker 的核心字段如下:

    // AbstractInvoker.java
    
    public abstract class AbstractInvoker<T> implements Invoker<T> {
        // 该 Invoker 对象封装的业务接口类型
        private final Class<T> type;
    
        // 当前 Invoker 关联的 URL 对象,其中包含了全部的配置信息
        private final URL url;
    
        // 当前 Invoker 关联的一些附加信息,这些附加信息可以来自关联的 URL
        private final Map<String, Object> attachment;
    
        // Invoker 的可用状态
        private volatile boolean available = true;
    
        // Invoker 的销毁状态
        private AtomicBoolean destroyed = new AtomicBoolean(false);
    }

1.2 invoke()实现

AbstractInvoker 实现了 Invoker 接口中的 invoke() 方法,采用了设计模式中的 模板方法模式

  1. 首先,对 URL 中的配置信息以及 RpcContext 中携带的信息进行处理,添加到 Invocation 中作为附加信息;
  2. 然后,调用 doInvoke() 方法发起远程调用(该方法由 AbstractInvoker 的子类实现),最后得到 AsyncRpcResult 对象返回。
    // AbstractInvoker.java
    
    @Override
    public Result invoke(Invocation inv) throws RpcException {
        // 1.将传入的Invocation转换为RpcInvocation
        RpcInvocation invocation = (RpcInvocation) inv;
        invocation.setInvoker(this);
    
        // 2.将attachment集合添加为Invocation的附加信息
        if (CollectionUtils.isNotEmptyMap(attachment)) {
            invocation.addObjectAttachmentsIfAbsent(attachment);
        }
    
        // 3.将RpcContext的附加信息添加为Invocation的附加信息
        Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
        if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
            invocation.addObjectAttachments(contextAttachments);
        }
    
        // 4.设置此次调用的模式,异步/同步
        invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
        // 如果是异步调用,给这次调用添加一个唯一ID
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    
        AsyncRpcResult asyncResult;
        try {
            // 5.调用子类实现的doInvoke()方法
            asyncResult = (AsyncRpcResult) doInvoke(invocation);
        } catch (InvocationTargetException e) { // biz exception
            Throwable te = e.getTargetException();
            if (te == null) {
                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
            } else {
                if (te instanceof RpcException) {
                    ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
                }
                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
            }
        } catch (RpcException e) {
            if (e.isBiz()) {
                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
            } else {
                throw e;
            }
        } catch (Throwable e) {
            asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        }
        // 异步调用设置Future
        RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture()));
        return asyncResult;
    }

二、RpcContext

RpcContext 是线程级别的上下文信息 ,每个线程绑定一个 RpcContext 对象,底层依赖 ThreadLocal 实现。RpcContext 主要用于存储一个线程中一次请求的临时状态,当线程处理新的请求(Provider 端)或是线程发起新的请求(Consumer 端)时,RpcContext 中存储的内容就会更新。

2.1 InternalThreadLocal

下面来看 RpcContext 中两个InternalThreadLocal类型的核心字段,这两个字段的定义如下所示:

    // RpcContext.java
    
    public class RpcContext {
    
        /**
         * 发起请求时,使用该RpcContext来存储上下文信息
         */
        private static final InternalThreadLocal<RpcContext> LOCAL = new InternalThreadLocal<RpcContext>() {
            @Override
            protected RpcContext initialValue() {
                return new RpcContext();
            }
        };
    
        /**
         * 接收响应时,使用该RpcContext来存储上下文信息
         */
        private static final InternalThreadLocal<RpcContext> SERVER_LOCAL = new InternalThreadLocal<RpcContext>() {
            @Override
            protected RpcContext initialValue() {
                return new RpcContext();
            }
        };
        //...
    }

Dubbo 的 InternalThreadLocal 与 JDK 提供的 ThreadLocal 功能类似,只是底层实现略有不同,其底层的 InternalThreadLocalMap 采用数组结构存储数据,直接通过 index 获取变量,相较于 Map 方式计算 hash 值的性能更好。

2.2 InternalThread

InternalThreadLocal 需要与 InternalThread 类搭配使用才能发挥其性能优势。InternalThread 继承了 Thread 类,Dubbo 的线程工厂 NamedInternalThreadFactory 创建的线程类其实都是 InternalThread 实例对象。

InternalThread 中提供了 setThreadLocalMap()threadLocalMap() 两个方法,用于设置和获取 InternalThreadLocalMap 。InternalThreadLocalMap 中的核心字段有如下四个。

    // InternalThreadLocalMap.java
    
    public final class InternalThreadLocalMap {
    
        // 用于存储绑定到当前线程的数据
        private Object[] indexedVariables;
    
        // 当使用原生 Thread 的时候,会使用该 ThreadLocal 存储 InternalThreadLocalMap,这是一个降级策略。
        private static ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
    
        // 自增索引,用于计算下次存储到 indexedVariables 数组中的位置,这是一个静态字段
        private static final AtomicInteger NEXT_INDEX = new AtomicInteger();
    
        // 当一个与线程绑定的值被删除之后,会被设置为 UNSET 值
        public static final Object UNSET = new Object();
    }

在 InternalThreadLocalMap 中,获取当前线程绑定的InternalThreadLocaMap的静态方法,都会与 slowThreadLocalMap 字段配合实现降级,也就是说,如果当前线程为原生 Thread 类型,则根据 slowThreadLocalMap 获取InternalThreadLocalMap。以 getIfSet() 方法为例:

    // HUD.java
    
    public static InternalThreadLocalMap getIfSet() {
    
        // 获取当前线程
        Thread thread = Thread.currentThread(); 
    
        // 判断当前线程的类型
        if (thread instanceof InternalThread) { 
            // 如果是InternalThread类型,直接获取InternalThreadLocalMap返回
            return ((InternalThread) thread).threadLocalMap();
        }
    
        // 原生Thread则需要通过ThreadLocal获取InternalThreadLocalMap
        return slowThreadLocalMap.get(); 
    
    }

在拿到 InternalThreadLocalMap 对象之后,我们就可以调用其 setIndexedVariable() 方法和 indexedVariable() 方法读写,这里我结合InternalThreadLocal进行讲解。

在 InternalThreadLocal 的构造方法中,会使用 InternalThreadLocalMap.NEXT_INDEX 初始化其 index 字段(int 类型),在 InternalThreadLocal.set() 方法中就会将传入的数据存储到 InternalThreadLocalMap.indexedVariables 集合中,具体的下标位置就是这里的 index 字段值:

    // InternalThreadLocal.jva
    
    public final void set(V value) {
        // 如果要存储的值为null或是UNSERT,则直接清除
        if (value == null|| value == InternalThreadLocalMap.UNSET){
            remove(); 
        } 
        // 获取当前线程绑定的InternalThreadLocalMap    
        else {
            InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
            // 将value存储到InternalThreadLocalMap.indexedVariables集合中
            if (threadLocalMap.setIndexedVariable(index, value)) {
                // 将当前InternalThreadLocal记录到待删除集合中
                addToVariablesToRemove(threadLocalMap, this);
            }
        }
    }

InternalThreadLocal 的静态变量 VARIABLES_TO_REMOVE_INDEX 是调用 InternalThreadLocalMap.nextVariableIndex() 方法得到的一个索引值,在 InternalThreadLocalMap 数组的对应位置保存的是 Set<InternalThreadLocal> 类型的集合,也就是上面提到的“待删除集合”,即绑定到当前线程所有的 InternalThreadLocal,这样就可以方便管理对象及内存的释放。

接下来我们继续看 InternalThreadLocalMap.setIndexedVariable() 方法的实现:

    // InternalThreadLocalMap.java
    
    public boolean setIndexedVariable(int index, Object value) {
        Object[] lookup = indexedVariables;
    
        if (index < lookup.length) { 
            // 将value存储到index指定的位置
            Object oldValue = lookup[index];
            lookup[index] = value;
            return oldValue == UNSET; 
        } else {
            // 当index超过indexedVariables数组的长度时,需要对indexedVariables数组进行扩容
            expandIndexedVariableTableAndSet(index, value);
            return true;
        }
    }

明确了设置 InternalThreadLocal 变量的流程之后,我们再来分析读取 InternalThreadLocal 变量的流程,入口在 InternalThreadLocal 的 get() 方法:

    // InternalThreadLocal.java
    
    public final V get() {
    
        // 获取当前线程绑定的InternalThreadLocalMap
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    
        // 根据当前InternalThreadLocal对象的index字段,从InternalThreadLocalMap中读取相应的数据
        Object v = threadLocalMap.indexedVariable(index);
        if (v != InternalThreadLocalMap.UNSET) {
            return (V) v; // 如果非UNSET,则表示读取到了有效数据,直接返回
        }
    
        // 读取到UNSET值,则会调用initialize()方法进行初始化,其中首先会调用initialValue()方法进行初始化,然后会调用前面介绍的setIndexedVariable()方法和addToVariablesToRemove()方法存储初始化得到的值
        return initialize(threadLocalMap);
    }

我们可以看到,在 RpcContext 中,LOCAL 和 SERVER_LOCAL 两个 InternalThreadLocal 类型的字段都实现了 initialValue() 方法,它们的实现都是创建并返回 RpcContext 对象。

理解了 InternalThreadLocal 的底层原理之后,我们回到 RpcContext 继续分析。RpcContext 作为调用的上下文信息,可以记录非常多的信息,下面介绍其中的一些核心字段。

  • attachments(Map 类型):可用于记录调用上下文的附加信息,这些信息会被添加到 Invocation 中,并传递到远端节点。
  • values(Map 类型):用来记录上下文的键值对信息,但是不会被传递到远端节点。
  • methodName、parameterTypes、arguments:分别用来记录调用的方法名、参数类型列表以及具体的参数列表,与相关 Invocation 对象中的信息一致。
  • localAddress、remoteAddress(InetSocketAddress 类型):记录了自己和远端的地址。
  • request、response(Object 类型):可用于记录底层关联的请求和响应。
  • asyncContext(AsyncContext 类型):异步Context,其中可以存储异步调用相关的 RpcContext 以及异步请求相关的 Future。

三、DubboInvoker

通过前面对 DubboProtocol 的分析我们知道,protocolBindingRefer() 方法会根据调用的业务接口类型以及 URL 创建底层的 ExchangeClient 集合,然后封装成 DubboInvoker 对象返回。本节,我对DubboInvoker的请求调用流程进行讲解,DubboInvoker 支持同步、异步两种调用模式。

3.1 请求调用

DubboInvoker 是 AbstractInvoker 的实现类,在其 doInvoke() 方法中首先会选择此次调用使用 ExchangeClient 对象,然后确定此次调用是否需要返回值,最后调用 ExchangeClient.request() 方法发送请求,对返回的 Future 进行简单封装并返回:

    // DubboInvoker.java
    
    protected Result doInvoke(final Invocation invocation) throws Throwable {
    
        RpcInvocation inv = (RpcInvocation) invocation;
    
        // 此次调用的方法名称
        final String methodName = RpcUtils.getMethodName(invocation);
    
        // 向Invocation中添加附加信息,这里将URL的path和version添加到附加信息中
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);
    
        ExchangeClient currentClient; // 选择一个ExchangeClient实例
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
    
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        // 根据调用的方法名称和配置计算此次调用的超时时间
        int timeout = calculateTimeout(invocation, methodName); 
    
        // 不需要关注返回值的请求
        if (isOneway) { 
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } 
        // 需要关注返回值的请求
        else { 
            // 获取处理响应的线程池,对于同步请求,会使用ThreadlessExecutor,ThreadlessExecutor的原理前面已经分析过了,这里不再赘述;对于异步请求,则会使用共享的线程池,ExecutorRepository接口的相关设计和实现在前面已经详细分析过了,这里不再重复。
            ExecutorService executor = getCallbackExecutor(getUrl(), inv);
    
            // 使用上面选出的ExchangeClient执行request()方法,将请求发送出去
            CompletableFuture<AppResponse> appResponseFuture =
                    currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
    
            // 这里将AppResponse封装成AsyncRpcResult返回
            AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
            result.setExecutor(executor);
            return result;
        }
    }

在 DubboInvoker.invoke() 方法中有一些细节需要关注一下。首先是根据 URL 以及 Invocation 中的配置,决定此次调用是否为 oneway 调用方式

    // DubboInvoker.java
    
    public static boolean isOneway(URL url, Invocation inv) {
    
        boolean isOneway;
    
        if (Boolean.FALSE.toString().equals(inv.getAttachment(RETURN_KEY))) {
            // 首先关注的是Invocation中"return"这个附加属性
            isOneway = true; 
        } else {
            // 之后关注URL中,调用方法对应的"return"配置
            isOneway = !url.getMethodParameter(getMethodName(inv), RETURN_KEY, true); 
        }
        return isOneway;
    }

oneway 指的是客户端发送消息后,不需要得到响应。所以,对于那些不关心服务端响应的请求,就比较适合使用 oneway 通信。

发送 oneway 请求的方式是send() 方法,而后面发送 twoway 请求的方式是 request() 方法。

202308162142279742.png

通过之前的分析我们知道,request() 方法会相应地创建 DefaultFuture 对象以及检测超时的定时任务,而 send() 方法则不会创建这些东西,它是直接将 Invocation 包装成 oneway 类型的 Request 发送出去。

在服务端的 HeaderExchangeHandler.receive() 方法中,会针对 oneway 请求和 twoway 请求执行不同的分支处理:

  • twoway 请求由 handleRequest() 方法进行处理,其中会关注调用结果并形成 Response 返回给客户端;
  • oneway 请求则直接交给上层的 DubboProtocol.requestHandler,完成方法调用之后,不会返回任何 Response。

202308162142289783.png

在 Client 端发送请求时,首先会创建对应的 DefaultFuture(其中记录了请求 ID 等信息),然后依赖 Netty 的异步发送特性将请求发送到 Server 端。需要说明的是,这整个发送过程是不会阻塞任何线程的。之后,将 DefaultFuture 返回给上层,在这个返回过程中,DefaultFuture 会被封装成 AsyncRpcResult,同时也可以添加回调函数。

当 Client 端接收到响应结果的时候,会交给关联的线程池(ExecutorService)或是业务线程(使用 ThreadlessExecutor 场景)进行处理,得到 Server 返回的真正结果。拿到真正的返回结果后,会将其设置到 DefaultFuture 中,并调用 complete() 方法将其设置为完成状态。此时,就会触发前面注册在 DefaulFuture 上的回调函数,执行回调逻辑。

四、总结

本章,我对Dubbo RPC 模块中的Invoker接口及实现进行了深入分析。首先,我们介绍了 AbstractInvoker 抽象类提供的公共能力,然后分析了 RpcContext 的功能和涉及的组件,例如,InternalThreadLocal、InternalThreadLocalMap 等;最后,我对 DubboInvoker 这个实现类进行了讲解,并分别针对 oneway 和 twoway 两种类型的请求进行了分析。

阅读全文