2023-08-16
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/372

在前面章节,我已经深入分析了 dubbo-remoting模块的Transport子层,并对这一子层的核心接口,以及Client和Server的底层实现进行了讲解。本章,我将对 dubbo-remoting模块的Exchange子层进行分析,Exchange层是Transport层的上一层,即 Dubbo Remoting 层中的最顶层。

Dubbo 将信息交换行为抽象成 Exchange 层,官方文档对这一层的说明是:封装了请求-响应的语义,即关注一问一答的交互模式,实现了同步转异步 。在 Exchange 这一子层,以 RequestResponse 为中心,针对 Channel、ChannelHandler、Client、RemotingServer 等接口进行实现。

202308162141595491.png

一、Request/Response

Exchange 子层的 RequestResponse 是该层的核心对象,是对请求和响应的抽象。

1.1 Request

Request定义在org.apache.dubbo.remoting.exchange,是对请求的抽象:

    public class Request {
    
        // 用于生成请求的自增ID,当递增到Long.MAX_VALUE之后,会溢出到Long.MIN_VALUE,可以继续使用该负数作为消息ID
        private static final AtomicLong INVOKE_ID = new AtomicLong(0);
    
        // 请求的ID
        private final long mId;
    
        // 请求版本号
        private String mVersion;
    
        // 双向请求标识,如果设置为true,则Server端收到请求后,需要给Client返回一个响应
        private boolean mTwoWay = true;
    
        // 事件标识,例如心跳请求、只读请求等,都会带有这个标识
        private boolean mEvent = false;
    
        // 请求发送到Server之后,由Decoder将二进制数据解码成Request对象,如果解码环节遇到异常,则会设置该标识
        private boolean mBroken = false;
    
        // 请求体,可以是任何Java类型的对象,也可以是null
        private Object mData;
    
        public Request() {
            mId = newId();
        }
    
        public Request(long id) {
            mId = id;
        }
    
        private static long newId() {
            return INVOKE_ID.getAndIncrement();
        }
        //...
    }

1.2 Response

Response定义在org.apache.dubbo.remoting.exchange,是对响应的抽象:

    public class Response {
    
        // 响应ID,与相应请求的ID一致
        private long mId = 0;
    
        // 当前协议的版本号,与请求消息的版本号一致
        private String mVersion;
    
        // 响应状态码,有OK、CLIENT_TIMEOUT、SERVER_TIMEOUT等10多个可选值
        private byte mStatus = OK;
    
        // 事件标识
        private boolean mEvent = false;
    
        // 可读的错误响应消息
        private String mErrorMsg;
    
        // 响应体
        private Object mResult;
    
        public Response() {
        }
    
        public Response(long id) {
            mId = id;
        }
    
        public Response(long id, String version) {
            mId = id;
            mVersion = version;
        }
        //...
    }

二、ExchangeChannel

在 Exchange 层中定义了 ExchangeChannel 接口,它在 Channel 接口之上抽象了 Exchange 层的网络连接:

    public interface ExchangeChannel extends Channel {
    
        /**
         * 发送请求
         */
        CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException;
    
        /**
         * 发送请求,可指定超时时间
         */
        CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException;
    
        /**
         * 返回ExchangeHandler
         */
        ExchangeHandler getExchangeHandler();
    
        /**
         * 优雅关闭
         */
        @Override
        void close(int timeout);
    }

2.1 HeaderExchangeChannel

HeaderExchangeChannel 是 ExchangeChannel 的实现类:

202308162142013732.png

HeaderExchangeChannel 本身是 Channel 的装饰器,封装了一个 Channel 对象,它的 send() 方法和 request() 方法底层都是委托内部的Channel 对象实现的:

    // HeaderExchangeChannel.java
    
    final class HeaderExchangeChannel implements ExchangeChannel {
    
        private final Channel channel;
    
        private volatile boolean closed = false;
    
        HeaderExchangeChannel(Channel channel) {
            if (channel == null) {
                throw new IllegalArgumentException("channel == null");
            }
            this.channel = channel;
        }
    
        @Override
        public void send(Object message, boolean sent) throws RemotingException {
            if (closed) {
                throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");
            }
            if (message instanceof Request
                    || message instanceof Response
                    || message instanceof String) {
                channel.send(message, sent);
            } else {
                Request request = new Request();
                request.setVersion(Version.getProtocolVersion());
                request.setTwoWay(false);
                request.setData(message);
                channel.send(request, sent);
            }
        }
    
        @Override
        public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
            if (closed) {
                throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
            }
            // 创建请求
            Request req = new Request();
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay(true);
            req.setData(request);
            DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
            try {
                channel.send(req);
            } catch (RemotingException e) {
                future.cancel();
                throw e;
            }
            // 返回一个 DefaultFuture 对象
            return future;
        }
        //...
    }

注意上述的request()方法,首先创建一个Request请求,然后通过底层的Channel发送出去,最后返回的是一个DefaultFuture对象。

2.2 DefaultFuture

DefaultFuture 继承了 JDK 中的 CompletableFuture:

    public class DefaultFuture extends CompletableFuture<Object> {
    
        // 管理请求与 Channel 之间的关联关系,Key 为请求ID,Value 为发送请求的 Channel
        private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>();
    
        // 管理请求与 DefaultFuture 之间的关联关系,Key 为请求ID,Value 为请求对应的 Future
        private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
    
        // 时间轮,用于处理超时
        public static final Timer TIME_OUT_TIMER = new HashedWheelTimer(
                new NamedThreadFactory("dubbo-future-timeout", true),
                30,
                TimeUnit.MILLISECONDS);
    
        // 请求的ID
        private final Long id;
    
        // 请求
        private final Request request;
    
        // 发送请求的 Channel
        private final Channel channel;
    
        // 整个请求-响应交互完成的超时时间
        private final int timeout;
    
        // 该 DefaultFuture 的创建时间
        private final long start = System.currentTimeMillis();
    
        // 请求发送的时间
        private volatile long sent;
    
        // 该定时任务到期时,表示对端响应超时
        private Timeout timeoutCheckTask;
    
        // 请求关联的线程池
        private ExecutorService executor;
    
        //...
    }

再来看DefaultFuture对象的创建,是通过DefaultFuture.newFuture()方法:

    // DefaultFuture.java
    
    public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
        final DefaultFuture future = new DefaultFuture(channel, request, timeout);
        future.setExecutor(executor);
    
        // 对于ThreadlessExecutor的特殊处理
        // ThreadlessExecutor可以关联一个waitingFuture,就是这里创建DefaultFuture对象
        if (executor instanceof ThreadlessExecutor) {
            ((ThreadlessExecutor) executor).setWaitingFuture(future);
        }
        // 创建一个定时任务,用处理响应超时的情况
        timeoutCheck(future);
        return future;
    }

HeaderExchangeChannel在发送请求的过程中,会触发沿途的 ChannelHandler.sent() 方法。其中的 HeaderExchangeHandler 会调用 DefaultFuture.sent() 方法更新 sent 字段,记录请求发送的时间戳,后续如果响应超时,则会将该发送时间戳添加到提示信息中。

当 Consumer 收到并读取对端返回的响应后,会触发 Dubbo Channel 中的各个 ChannelHandler 的 received() 方法。例如,AllChannelHandler 会将后续 ChannelHandler.received() 方法的调用封装成任务提交到线程池中,响应会提交到 DefaultFuture 关联的线程池中。

当响应传递到 HeaderExchangeHandler 时,会通过调用 HeaderExchangeHandler.handleResponse() 方法进行处理,该方法内部调用了 DefaultFuture.received() 方法,会找到响应关联的 DefaultFuture 对象(根据请求ID 从 FUTURES 集合查找)并调用 doReceived() 方法,将 DefaultFuture 设置为完成状态。

    // DefaultFuture.java
    
    public static void received(Channel channel, Response response, boolean timeout) {
        try {
            // 拿到关联的Future
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                Timeout t = future.timeoutCheckTask;
                // 未超时,取消定时任务
                if (!timeout) {
                    t.cancel();
                }
                future.doReceived(response);
            } else {
                // 记录日志
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }
    
    private void doReceived(Response res) {
        if (res == null) {
            throw new IllegalStateException("response cannot be null");
        }
        // 正常响应
        if (res.getStatus() == Response.OK) {
            this.complete(res.getResult());
        } 
        // 超时
        else if (res.getStatus() == Response.CLIENT_TIMEOUT 
                   || res.getStatus() == Response.SERVER_TIMEOUT) {
            this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
        } 
        // 其他异常
        else {
            this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
        }
    
        // 针对ThreadlessExecutor的兜底处理,防止业务线程一直阻塞在ThreadlessExecutor上
        if (executor != null && executor instanceof ThreadlessExecutor) {
            ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
            if (threadlessExecutor.isWaiting()) {
                // notifyReturn()方法会向ThreadlessExecutor提交一个任务
                // 这样业务线程就不会阻塞了,提交的任务会尝试将DefaultFuture设置为异常结束
                threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" + " which is not an expected state, interrupt the thread manually by returning an exception."));
            }
        }
    }

再来看响应超时的场景。在创建 DefaultFuture 时调用的 timeoutCheck() 方法中,会创建 TimeoutCheckTask 定时任务,并添加到时间轮中:

    // DefaultFuture.java
    
    private static void timeoutCheck(DefaultFuture future) {
        TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
        // 所有 DefaultFuture 对象共用一个TIME_OUT_TIMER时间轮
        future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
    }

TimeoutCheckTask 是 DefaultFuture 中的内部类,实现了 TimerTask 接口,可以提交到时间轮中等待执行。当响应超时的时候,TimeoutCheckTask 会创建一个 Response,并调用 DefaultFuture.received() 方法:

    // TimeoutCheckTask.java
    
    private static class TimeoutCheckTask implements TimerTask {
    
        private final Long requestID;
    
        TimeoutCheckTask(Long requestID) {
            this.requestID = requestID;
        }
    
        @Override
        public void run(Timeout timeout) {
            // 检查该任务关联的DefaultFuture对象是否已经完成
            DefaultFuture future = DefaultFuture.getFuture(requestID);
            if (future == null || future.isDone()) {
                return;
            }
    
            // 提交到线程池执行,注意ThreadlessExecutor的情况
            if (future.getExecutor() != null) {
                future.getExecutor().execute(() -> notifyTimeout(future));
            } else {
                notifyTimeout(future);
            }
        }
    
        private void notifyTimeout(DefaultFuture future) {
            // 没有收到对端的响应,这里会创建一个Response,表示超时的响应
            Response timeoutResponse = new Response(future.getId());
            timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
            timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
            // 将关联的DefaultFuture标记为超时异常完成
            DefaultFuture.received(future.getChannel(), timeoutResponse, true);
        }
    }

三、HeaderExchangeHandler

无论是发送请求还是处理响应,都会涉及HeaderExchangeHandler, HeaderExchangeHandler 是 ExchangeHandler 的装饰器 ,内部维护了一个 ExchangeHandler 对象。

3.1 继承体系

ExchangeHandler 接口是 Exchange 层与上层交互的接口之一:

  • 上层调用方可以实现该接口完成自身的功能,然后再由 HeaderExchangeHandler 修饰,具备 Exchange 层处理 Request-Response 的能力;
  • 最后再由 Transport层的 ChannelHandler 修饰,具备 Transport 层的能力。如下图所示:

202308162142019233.png

HeaderExchangeHandler 作为一个装饰器,其 connected()、disconnected()、sent()、received()、caught() 方法最终都会转发给上层提供的 ExchangeHandler 进行处理。这里我们需要聚焦的是 HeaderExchangeHandler 本身对 Request 和 Response 的处理逻辑。

3.2 receive方法

202308162142024664.png

结合上图,我们可以看到在received() 方法中,HeaderExchangeHandler 对收到的消息进行了分类处理:

  • 只读请求:由 handlerEvent() 方法 进行处理,它会在 Channel 上设置 channel.readonly 标志,后续的上层调用中会读取该值;
  • 双向请求:由 handleRequest() 方法 进行处理,先对解码失败的请求进行处理,返回异常响应。然后将正常解码的请求交给上层实现的 ExchangeHandler 进行处理,并添加回调。上层 ExchangeHandler 处理完请求后,会触发回调,根据处理结果填充响应结果和响应码,并向对端发送;
  • 单向请求:直接委托给 上层 ExchangeHandler.received() 方法 进行处理,由于不需要响应,HeaderExchangeHandler 不会关注处理结果;
  • Response: HeaderExchangeHandler 会通过 handleResponse() 方法 将关联的 DefaultFuture 设置为完成状态(或是异常完成状态);
  • String 类型消息:HeaderExchangeHandler 会根据当前服务的角色进行分类,具体与 Dubbo 对 telnet 的支持相关,后面的章节我会详细介绍。

3.3 sent()方法

HeaderExchangeHandler.sent()方法,会通知上层 ExchangeHandler 实现的 sent() 方法,同时还会针对 Request 请求调用 DefaultFuture.sent() 方法记录请求的具体发送时间。

3.4 connected()方法

HeaderExchangeHandler.connected()方法,会为 Dubbo Channel 创建相应的 HeaderExchangeChannel,并将两者绑定,然后通知上层 ExchangeHandler 处理 connect 事件。

3.5 disconnected()方法

HeaderExchangeHandler.disconnected()方法,会首先通知上层 ExchangeHandler 进行处理,之后在 DefaultFuture.closeChannel() 通知 DefaultFuture 连接断开(其实就是创建并传递一个 Response,该 Response 的状态码为 CHANNEL_INACTIVE),这样就不会继续阻塞业务线程了,最后再将 HeaderExchangeChannel 与底层的 Dubbo Channel 解绑。

四、HeaderExchangeClient

HeaderExchangeClient 是 Client 的装饰器,主要为其装饰的 Client 添加两个功能:

  • 维持与 Server 的长连接,通过 定时发送心跳消息 实现;
  • 故障掉线之后进行重连,通过 定时检查连接状态 实现。

202308162142035905.png

4.1 核心字段

HeaderExchangeClient 中有两个核心字段clientchannel,都是底层被装饰的对象:

    // HeaderExchangeClient.java
    
    public class HeaderExchangeClient implements ExchangeClient {
    
        // 被修饰的Client对象。HeaderExchangeClien中对Client接口的实现,都会委托给该对象进行处理
        private final Client client;
    
        // 被修饰的ExchangeChannel对象,代表了Client与服务端的连接
        // HeaderExchangeClient 中对 ExchangeChannel 接口的实现,都会委托给该对象进行处理。
        private final ExchangeChannel channel;
    
        // 时间轮
        private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(
                new NamedThreadFactory("dubbo-client-idleCheck", true), 1, TimeUnit.SECONDS, TICKS_PER_WHEEL);
        // 心跳
        private HeartbeatTimerTask heartBeatTimerTask;
        // 重连
        private ReconnectTimerTask reconnectTimerTask;
    
        public HeaderExchangeClient(Client client, boolean startTimer) {
            Assert.notNull(client, "Client can't be null");
            this.client = client;
            this.channel = new HeaderExchangeChannel(client);
            // 如果启动定时任务
            if (startTimer) {
                URL url = client.getUrl();
                startReconnectTask(url);
                startHeartBeatTask(url);
            }
        }
    }

4.2 心跳和重连

HeaderExchangeClient 侧重定时轮资源的分配、定时任务的创建和取消。我们来看startReconnectTaskstartHeartBeatTask这两个方法。

startReconnectTask

HeaderExchangeClient.startReconnectTask()方法,启动一个断线重连定时任务:

    // HeaderExchangeClient.java
    
    private void startReconnectTask(URL url) {
        // 解析URL参数,判断是否需要断线重连
        if (shouldReconnect(url)) {
            AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);
    
            // 解析参数中的断线时长
            int idleTimeout = getIdleTimeout(url);
            // 计算检测间隔
            long heartbeatTimeoutTick = calculateLeastDuration(idleTimeout);
    
            // 创建断线重连定时任务,并提交到时间轮
            this.reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, idleTimeout);
            IDLE_CHECK_TIMER.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
        }
    }

startHeartBeatTask

HeaderExchangeClient.startHeartBeatTask()方法,启动一个心跳定时任务:

    // HeaderExchangeClient.java
    
    private void startHeartBeatTask(URL url) {
        // Client的具体实现决定是否启动该心跳任务
        if (!client.canHandleIdle()) {    // 返回true表示该Client实现可以自己发送心跳请求,无须再启动一个定时任务
            AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);
    
            // 计算心跳间隔,不能低于1s
            int heartbeat = getHeartbeat(url);
            long heartbeatTick = calculateLeastDuration(heartbeat);
    
            // 创建心跳任务,提交到时间轮
            this.heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
            IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
        }
    }

HeartbeatTimerTask 心跳定时任务的类继承图如下:

202308162142047546.png

先来看AbstractTimerTask:

    // AbstractTimerTask.java
    
    public abstract class AbstractTimerTask implements TimerTask {
    
        // ChannelProvider是AbstractTimerTask中定义的内部接口,定时任务会从该对象中获取Channel
        private final ChannelProvider channelProvider;
    
        // 任务的过期时间
        private final Long tick;
    
        // 任务是否已取消
        protected volatile boolean cancel = false;
    
        interface ChannelProvider {
            Collection<Channel> getChannels();
        }    
    }

AbstractTimerTask 实现了 TimerTask 接口的 run() 方法:

  1. 首先,从 ChannelProvider 中获取此次任务相关的 Channel 集合(在 Client 端只有一个 Channel,在 Server 端有多个 Channel);
  2. 然后,检查各个 Channel 的状态,针对未关闭的 Channel 执行 doTask() 方法处理;
  3. 最后,通过 reput() 方法将当前任务重新加入时间轮中,等待再次到期执行。
    // AbstractTimerTask.java
    
    @Override
    public void run(Timeout timeout) throws Exception {
        // 1.从 ChannelProvider 中获取此次任务相关的 Channel 集合
        Collection<Channel> c = channelProvider.getChannels();
    
        // 2.遍历Channel处理
        for (Channel channel : c) {
            if (channel.isClosed()) {
                continue;
            }
            // 抽象方法,对于未关闭的Channel执行任务
            doTask(channel);
        }
        // 3.将当前任务重新加入时间轮,以达到周期执行的效果
        reput(timeout, tick);
    }

上述的 doTask() 是 AbstractTimerTask 留给子类实现的抽象方法,不同的定时任务执行不同的操作。例如,HeartbeatTimerTask.doTask() 会计算当前时间与最后一次读写时间之间的间隔,如果大于心跳间隔,就会发送一个心跳请求,核心实现如下:

    // HeartbeatTimerTask.java
    
    protected void doTask(Channel channel) {
        try {
            // 获取最后一次读写时间
            Long lastRead = lastRead(channel);
            Long lastWrite = lastWrite(channel);
    
            // 最后一次读写时间超过心跳时间,就会发送心跳请求
            if ((lastRead != null && now() - lastRead > heartbeat)
                || (lastWrite != null && now() - lastWrite > heartbeat)) {
                // 发送一个心跳请求
                Request req = new Request();
                req.setVersion(Version.getProtocolVersion());
                req.setTwoWay(true);
                req.setEvent(HEARTBEAT_EVENT);
                channel.send(req);
            }
        } catch (Throwable t) {
            logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
        }
    }

五、HeaderExchangeServer

HeaderExchangeServer 是 RemotingServer 的装饰器,实现自 RemotingServer 接口的大部分方法都委托给了内部所装饰的 RemotingServer 对象。HeaderExchangeServer 主要为其装饰的 RemotingServer 提供定期关闭空闲长连接的功能:

202308162142056287.png

5.1 核心字段

HeaderExchangeServer的核心字段包含了一个时间轮以及装饰的RemotingServer:

    // HeaderExchangeServer.java
    
    public class HeaderExchangeServer implements ExchangeServer {
        // 装饰的RemotingServer,HeaderExchangeServer中对RemotingServer接口的实现,都会委托给该对象进行处理
        private final RemotingServer server;
    
        // Server的关闭状态
        private AtomicBoolean closed = new AtomicBoolean(false);
    
        // 时间轮
        private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(new NamedThreadFactory("dubbo-server-idleCheck", true), 1,
                TimeUnit.SECONDS, TICKS_PER_WHEEL);
    
        // 关闭空闲长连接的定时任务
        private CloseTimerTask closeTimerTask;
    
        public HeaderExchangeServer(RemotingServer server) {
            Assert.notNull(server, "server == null");
            this.server = server;
            // 启动一个空闲长连接检测定时任务
            startIdleCheckTask(getUrl());
        }
    }

5.2 关闭空闲连接

在 HeaderExchangeServer 的构造方法中,startIdleCheckTask()方法会启动一个 CloseTimerTask 定时任务,定期关闭长时间空闲的连接:

    // HeaderExchangeServer.java
    
    private void startIdleCheckTask(URL url) {
        // RemotingServer的具体实现决定是否启动该任务
        if (!server.canHandleIdle()) {
            AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels());
            // 空闲时间
            int idleTimeout = getIdleTimeout(url);
            // 检测周期
            long idleTimeoutTick = calculateLeastDuration(idleTimeout);
            // 创建任务
            CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout);
            this.closeTimerTask = closeTimerTask;
            // 提交到时间轮
            IDLE_CHECK_TIMER.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS);
        }
    }

5.3 关闭Server

我们再来看看HeaderExchangeServer的关闭流程。HeaderExchangeServer.close() 方法的核心逻辑如下:

  1. 首先,将底层被装饰的 RemotingServer 的closing字段设置为true,表示这个 Server 端正在关闭,不再接受新连接;
  2. 向 Client 发送一个携带 ReadOnly 事件的请求(根据 URL 中的配置决定是否发送,默认为发送)。Client 端收到该请求后,会在 Channel 上添加 Key 为 channel.readonly 的附加信息,上层调用方会根据该附加信息,判断该连接是否可写;
  3. 循环检测是否还存在 Client 与当前 Server 维持着长连接,直至全部 Client 断开连接或超时;
  4. 更新HeaderExchangeServer的closed 字段为 true,之后 Client 不会再发送任何请求或回复响应;
  5. 取消 CloseTimerTask 定时任务;
  6. 调用底层修饰的 RemotingServer 对象的 close() 方法。以 NettyServer 为例,其 close() 方法会关闭Channel,并关闭 bossGroup 和 workerGroup 两个线程池。
    // HeaderExchangeServer.java
    
    @Override
    public void close(final int timeout) {
        // 1.将底层RemotingServer的closing字段设置为true,表示当前Server正在关闭,不再接收连接
        startClose();
        if (timeout > 0) {
            final long max = (long) timeout;
            final long start = System.currentTimeMillis();
            // 向 Client 发送一个携带 ReadOnly 事件的请求
            if (getUrl().getParameter(Constants.CHANNEL_SEND_READONLYEVENT_KEY, true)) {
                sendChannelReadOnlyEvent();
            }
            // 循环检测是否还存在与Client的连接
            while (HeaderExchangeServer.this.isRunning()
                   && System.currentTimeMillis() - start < max) {
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
        // 取消 CloseTimerTask 定时任务
        doClose();
        // 调用底层修饰的 RemotingServer 对象的 close() 方法
        server.close(timeout);
    }

六、HeaderExchanger

对于Exchange子层的上层来说,Exchange 层的入口是 Exchangers 这个门面类,其中提供了多个 bind() 以及 connect() 方法的重载,这些重载方法最终会通过 SPI 机制,获取 Exchanger 接口的扩展实现,这个流程与Transport 层的入口—— Transporters 门面类相同。

6.1 Exchanger接口

我们可以看到 Exchanger 接口的定义与前面介绍的 Transporter 接口非常类似,同样是被 @SPI 接口修饰(默认扩展名为“header”,对应的是 HeaderExchanger 这个实现):

    @SPI(HeaderExchanger.NAME)
    public interface Exchanger {
    
        @Adaptive({Constants.EXCHANGER_KEY})
        ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;
    
        @Adaptive({Constants.EXCHANGER_KEY})
        ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
    }

Exchanger 的 bind() 方法和 connect() 方法被 @Adaptive 注解修饰,可以通过 URL 中的 exchanger 参数值指定扩展名称来覆盖默认值。

Dubbo 只为 Exchanger 接口提供了 HeaderExchanger 这一个实现,其中 connect() 方法创建的是 HeaderExchangeClient 对象,bind() 方法创建的是 HeaderExchangeServer 对象:

202308162142064518.png

从 HeaderExchanger 的实现可以看到,它会在 Transport 层的 Client 和 Server 实现基础之上,添加HeaderExchangeClient 和 HeaderExchangeServer 装饰器。同时,为上层实现的 ExchangeHandler 实例添加了 HeaderExchangeHandler 以及 DecodeHandler 两个装饰器:

    // HeaderExchanger.java
    
    public class HeaderExchanger implements Exchanger {
    
        public static final String NAME = "header";
    
        @Override
        public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
            return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
        }
    
        @Override
        public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
            return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
        }
    }

七、总结

本章,我对 Dubbo Exchange 子层中的 Request-Response 响应模型进行了分析,然后又介绍了 ExchangeChannel 对 Channel 接口的实现,同时还说明了发送请求之后得到的 DefaultFuture 对象。最后,我讲解了 HeaderExchangeHandler 是如何将 Transporter 层的 ChannelHandler 对象与上层的 ExchangeHandler 对象相关联的。

阅读全文