2023-07-30
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/320

前面章节提到,我们的分布式文件系统要追求高性能,需要从最基本的两个方面去考量:

  1. 客户端连接方式:目前客户端采用短连接,每次发送请求都需要重新建立连接,频繁的建立和断开连接使得系统的性能开销很大;
  2. 客户端请求方式:目前客户端采用同步请求方式,效率非常低,我们可以参考Kafka,实现异步请求及回调机制。

本章,我就来对DFS客户端进行改造,支持长连接机制和异步请求机制。

本章的代码存放在:https://gitee.com/ressmix/source-code/tree/master/5.dfs/6.performance, 需要的读者请自行下载参阅。

一、整体设计

新的DFS客户端的设计主要分为两部分:

  1. 连接管理:客户端应当管理各个Host的连接,包括针对该Host建立连接、根据Host获取已建立的连接;
  2. 异步请求:客户端与Host建立连接后,对于发送的请求进行排队异步化处理,并支持响应回调。

我用先用一张图,来表述改造后的客户端的整个请求/响应处理流程:

202307302137523061.png

如果读者看过我之前的《透彻理解Kafka》专栏,一定会觉得上述流程与Kafka Client的请求/响应处理流程很类似,事实上,我就是模仿了Kafka的客户端设计来进行改造的,同时做了很多精简。

Kafka的网络通信模块,基于Java NIO实现,非常值得精细研读其源码,并且可以直接借鉴并运用到自己的系统中,因为已经经历了大量生产的考验。

1.1 核心组件

接着,我将从类级别,讲解重构后的客户端的核心组件:

  • FileSystem:接口类,封装了面向使用方的接口定义,比如创建目录、上传文件、下载文件等等;
  • NetworkRequest:请求封装类;
  • NetworkResponse:响应封装类;
  • ResponseCallback:响应回调接口;
  • NIOClient:NIO客户端,负责与NIO Server通信;
  • NetworkManager:网络管理器,也是最核心的组件,负责建立连接、缓存连接,发送网络请求,读取响应,它的内部包含了负责实际发送请求和接受响应的Sender线程。

二、底层实现

我们来看各个组件的底层实现。

2.1 FileSystem

FileSystem接口比较简单:

    /**
     * 作为文件系统的接口
     */
    public interface FileSystem {
        /**
         * 上传一个文件
         */
        Boolean upload(FileInfo fileInfo, ResponseCallback callback) throws Exception;
    
        /**
         * 下载一个文件
         */
        byte[] download(FileInfo fileInfo) throws Exception;
    
        //...
    }

我们重点看它的uploaddownload方法的实现,内部实际上是调用了NIOClient这一组件进行真正的文件上传和下载:

    /**
     * 文件系统客户端的实现类
     */
    public class FileSystemImpl implements FileSystem {
        // gRPC客户端
        private NameNodeRpcClient rpcClient;
        // nio客户端
        private NIOClient nioClient;
    
        public FileSystemImpl() {
            this.rpcClient = new NameNodeRpcClient();
            this.nioClient = new NIOClient();
        }
    
        public Boolean upload(FileInfo fileInfo, ResponseCallback callback) {
            // 1.RPC接口发送文件元数据
            String filename = fileInfo.getFilename();
            if (!filename.startsWith(File.separator)) {
                filename = File.separator + filename;
            }
            if (!rpcClient.createFile(filename)) {
                return false;
            }
    
            // 2.RPC接口获取DataNode
            String datanodesJson = rpcClient.allocateDataNodes(fileInfo, "");
            System.out.println(datanodesJson);
            if (datanodesJson == null) {
                return false;
            }
    
            // 3.遍历DataNode,依次上传文件
            JSONArray datanodes = JSONArray.parseArray(datanodesJson);
            for (int i = 0; i < datanodes.size(); i++) {
                JSONObject datanode = datanodes.getJSONObject(i);
                String hostname = datanode.getString("hostname");
                String ip = datanode.getString("ip");
                int nioPort = datanode.getIntValue("nioPort");
                Host host = new Host(hostname, ip, nioPort);
                try {
                    // 使用nio客户端上传文件
                    nioClient.sendFile(host, fileInfo, callback);
                } catch (Exception ex) {
                    ex.printStackTrace();
                    // 出现异常时,上送异常DataNode信息,并重新获取一个正常的DataNode
                    String reallocateDataNode = rpcClient.allocateDataNodes(fileInfo, ip + "-" + hostname);
                    datanode = JSONObject.parseObject(reallocateDataNode);
                    hostname = datanode.getString("hostname");
                    ip = datanode.getString("ip");
                    nioPort = datanode.getIntValue("nioPort");
                    nioClient.sendFile(new Host(hostname, ip, nioPort), fileInfo, callback);
                }
    
            }
            return true;
        }
    
        public byte[] download(FileInfo fileInfo) throws Exception {
            // 1.获取待下载文件对应的可用DataNode节点
            String datanode = rpcClient.getDataNodeForFile(fileInfo.getFilename(), "");
            System.out.println("NameNode分配用来下载文件的数据节点:" + datanode);
    
            // 2.解析DataNode信息
            JSONObject jsonObject = JSONObject.parseObject(datanode);
            String hostname = jsonObject.getString("hostname");
            String ip = jsonObject.getString("ip");
            Integer nioPort = jsonObject.getInteger("nioPort");
            Host host = new Host(hostname, ip, nioPort);
    
            // 3.基于Java NIO下载文件
            byte[] file = null;
            try {
                // 使用nio客户端上传文件
                file = nioClient.readFile(host, fileInfo);
            } catch (Exception ex) {
                // 出现异常,重新获取一个可用DataNode,上送异常的DataNode信息
                datanode = rpcClient.getDataNodeForFile(fileInfo.getFilename(), ip + "-" + hostname);
                jsonObject = JSONObject.parseObject(datanode);
                hostname = jsonObject.getString("hostname");
                ip = jsonObject.getString("ip");
                nioPort = jsonObject.getInteger("nioPort");
                try {
                    file = nioClient.readFile(new Host(hostname, ip, nioPort), fileInfo);
                } catch (Exception e2) {
                    throw e2;
                }
            }
            return file;
        }
    
        public void shutdown() throws Exception {
            rpcClient.shutdown();
        }
    }

2.2 NIOClient

NIOClient内部封装了组件NetworkManager,并提供sendFilereadFile两个接口:

  • sendFile: 上传文件,首先尝试与目标主机建立连接,然后异步发送上传文件请求,不关注响应结果;
  • readFile: 下载文件,首先尝试与目标主机建立连接,然后异步发送上传文件请求,最后阻塞等待响应结果。
    public class NIOClient {
    
        private NetworkManager networkManager;
    
        public NIOClient() {
            this.networkManager = new NetworkManager();
        }
    
        /**
         * 上传文件
         */
        public void sendFile(Host host, FileInfo fileInfo, ResponseCallback callback) {
            // 1.尝试与HOST建立连接,阻塞等待直到建立连接或失败
            if (!networkManager.tryConnect(host)) {
                throw new RuntimeException("建立连接失败");
            }
            // 2.异步发送请求
            NetworkRequest request = createUploadFileRequest(host, fileInfo, callback);
            networkManager.sendRequest(request);
        }
    
        /**
         * 下载文件
         */
        public byte[] readFile(Host host, FileInfo fileInfo) throws Exception {
            // 1.尝试与HOST建立连接,阻塞等待直到建立连接或失败
            if (!networkManager.tryConnect(host)) {
                throw new RuntimeException("建立连接失败");
            }
    
            // 2.异步发送请求
            NetworkRequest request = createDownloadFileRequest(host, fileInfo, null);
            networkManager.sendRequest(request);
    
            // 3.阻塞等待响应
            NetworkResponse response = networkManager.waitResponse(request.getId());
            if (response.getError()) {
                throw new RuntimeException("响应异常");
            }
            return response.getBuffer().array();
        }
    
        /*----------------------------------------PRIVATE METHOD---------------------------------------*/
    
        /**
         * 创建一个上传文件的请求
         */
        private NetworkRequest createUploadFileRequest(Host host, FileInfo fileInfo, ResponseCallback callback) {
            ByteBuffer buffer = ByteBuffer.allocate(
                    NetworkRequest.REQUEST_TYPE +
                            NetworkRequest.FILENAME_LENGTH +
                            fileInfo.getFilename().getBytes().length +
                            NetworkRequest.FILE_LENGTH +
                            fileInfo.getFileLength());
            buffer.putInt(NetworkRequest.REQUEST_SEND_FILE);
            buffer.putInt(fileInfo.getFilename().getBytes().length);
            buffer.put(fileInfo.getFilename().getBytes());
            buffer.putLong(fileInfo.getFileLength());
            buffer.put(fileInfo.getFile());
            buffer.rewind();
    
            NetworkRequest request = new NetworkRequest();
            // 请求ID,唯一标识
            request.setId(UUID.randomUUID().toString());
            request.setHostname(host.getHostname());
            request.setIp(host.getIp());
            request.setNioPort(host.getNioPort());
            request.setRequestType(NetworkRequest.REQUEST_SEND_FILE);
            request.setBuffer(buffer);
            request.setNeedResponse(false);
            request.setCallback(callback);
    
            return request;
        }
    
        /**
         * 创建一个下载文件的请求
         */
        private NetworkRequest createDownloadFileRequest(Host host, FileInfo fileInfo, ResponseCallback callback) {
            NetworkRequest request = new NetworkRequest();
    
            byte[] filenameBytes = fileInfo.getFilename().getBytes();
    
            ByteBuffer buffer = ByteBuffer.allocate(
                    NetworkRequest.REQUEST_TYPE +
                            NetworkRequest.FILENAME_LENGTH +
                            filenameBytes.length);
            buffer.putInt(NetworkRequest.REQUEST_READ_FILE);
            buffer.putInt(filenameBytes.length);
            buffer.put(filenameBytes);
            buffer.rewind();
    
            request.setId(UUID.randomUUID().toString());
            request.setHostname(host.getHostname());
            request.setIp(host.getIp());
            request.setNioPort(host.getNioPort());
            request.setRequestType(NetworkRequest.REQUEST_READ_FILE);
            request.setBuffer(buffer);
            request.setNeedResponse(true);
            request.setCallback(callback);
    
            return request;
        }
    }

2.3 NetworkRequest

NetworkRequest就是请求的封装类,里面包含了这个请求的完整内容,以及目标Host信息:

    /**
     * 网络请求
     */
    public class NetworkRequest {
        // 请求类型:4字节
        public static final Integer REQUEST_TYPE = 4;
        // 文件名大小:4字节
        public static final Integer FILENAME_LENGTH = 4;
        // 文件内容大小:4字节
        public static final Integer FILE_LENGTH = 8;
        // 上传文件请求
        public static final Integer REQUEST_SEND_FILE = 1;
        // 下载文件请求
        public static final Integer REQUEST_READ_FILE = 2;
    
        private String id;
        private String hostname;
        private String ip;
        private Integer nioPort;
    
        private Integer requestType;
        private Long sendTime;
    
        // 完整请求内容Buffer
        private ByteBuffer buffer;
        // 当前请求是否需要响应,当为true时,客户端必须等收到响应后,才会发送下一个请求
        private Boolean needResponse;
    
        private ResponseCallback callback;
        //...
    }

注意,上述的needResponse表示当前请求是否需要关注响应,当为true时,相当于客户端发送后即丢弃;否则,客户端必须等收到响应后,才会发送下一个请求。

2.4 NetworkResponse

NetworkResponse就是响应的封装类,里面包含了与该响应对应的请求ID,目标Host信息,响应内容,以及是否出现是否出现异常等等信息:

    /**
     * 网络响应
     */
    public class NetworkResponse {
    
        public static final String RESPONSE_SUCCESS = "SUCCESS";
    
        private String requestId;
        private String hostname;
        private String ip;
        private Integer nioPort;
    
        // 响应头大小
        private ByteBuffer lengthBuffer;
        // 响应体内容
        private ByteBuffer buffer;
    
        private Boolean error;
        private Boolean finished;
        //...
    }

2.5 ResponseCallback

ResponseCallback是一个接口类,用来定义响应回调处理逻辑,由用户自定义实现:

    /**
     * 响应回调函数接口
     */
    public interface ResponseCallback {
    
        /**
         * 处理响应结果
         */
        void process(NetworkResponse response);
    }

2.6 NetworkManager

NetworkManager是整个DFS客户端最核心的组件,它通过一个名为connectState的 HashMap缓存客户端与Host的连接状态,这样当NIOClient尝试建立连接时,如果发现与目标Host的连接状态是CONNECTED,就不用阻塞等待,可以立即返回。

整体结构

NetworkManager内部包含了一个Selector,用于注册SocketChannel,它在构造是会创建两个线程:

  • NetworkPollThread:Sender线程,不断从缓存中获取请求进行发送;
  • RequestTimeoutCheckThread:超时检测线程,顾名思义,就是检测超时未发送的请求。
    /**
     * 网络连接管理器
     */
    public class NetworkManager {
        // 连接状态:正在连接
        public static final Integer CONNECTING = 1;
        // 连接状态:已连接
        public static final Integer CONNECTED = 2;
        // 连接状态:断开连接
        public static final Integer DISCONNECTED = 3;
    
        // poll超时时间(毫秒)
        public static final Long POLL_TIMEOUT = 500L;
    
        // 请求超时时长(30秒)
        public static final long REQUEST_TIMEOUT = 30 * 1000;
        // 请求超时检测间隔(1秒)
        public static final long REQUEST_TIMEOUT_CHECK_INTERVAL = 1000;
    
        // 多路复用Selector
        private Selector selector;
    
        // 缓存等待建立连接的Host
        private ConcurrentLinkedQueue<Host> waitingConnectHosts;
    
        // 缓存Host的连接状态
        private Map<Host, Integer> connectState;
        // 缓存Host对应的Channel
        private Map<Host, SelectionKey> connections;
    
        // 缓存待发送到Host的请求,保存到Host各自的请求队列
        private Map<Host, ConcurrentLinkedQueue<NetworkRequest>> waitingRequests;
        // 缓存当前正要发送到Host的请求
        private Map<Host, NetworkRequest> toSendRequests;
    
        // 缓存等待处理且已读取完整的响应,Key为请求ID
        private Map<String, NetworkResponse> finishedResponses;
        // 缓存等待处理且未读取完整的响应
        private Map<Host, NetworkResponse> unfinishedResponses;
    
        public NetworkManager() {
            try {
                this.selector = Selector.open();
            } catch (IOException e) {
                e.printStackTrace();
            }
    
            this.connections = new ConcurrentHashMap<Host, SelectionKey>();
            this.connectState = new ConcurrentHashMap<Host, Integer>();
            this.waitingConnectHosts = new ConcurrentLinkedQueue<Host>();
            this.waitingRequests = new ConcurrentHashMap<Host, ConcurrentLinkedQueue<NetworkRequest>>();
            this.toSendRequests = new ConcurrentHashMap<Host, NetworkRequest>();
            this.finishedResponses = new ConcurrentHashMap<String, NetworkResponse>();
            this.unfinishedResponses = new ConcurrentHashMap<Host, NetworkResponse>();
    
            // Sender线程
            new NetworkPollThread().start();
            // 请求超时检测线程
            new RequestTimeoutCheckThread().start();
        }
        //...
    }

建立连接

我们先来看 建立连接 的方法,这个方法实际上是将Host对象扔到一个队列中,然后由其它线程负责连接的建立。该方法会阻塞直到与目标Host成功建立连接(通过Host状态Map判断是否成功建立连接):

    // NetworkManager.java
    
    /**
     * 尝试建立连接
     */
    public Boolean tryConnect(Host host) {
        assert host != null;
        // 这里要加锁,防止多个线程同时建立连接
        synchronized (this) {
            if (!connectState.containsKey(host) || connectState.get(host).equals(DISCONNECTED)) {
                connectState.put(host, CONNECTING);
                // 加入HOST队列,等待线程异步出队并建立连接
                waitingConnectHosts.offer(host);
            }
            // 循环等待直到建立连接
            while (connectState.get(host).equals(CONNECTING)) {
                try {
                    // 等待100毫秒
                    wait(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 如果连接建立失败
            if (connectState.get(host).equals(DISCONNECTED)) {
                return false;
            }
            return true;
        }
    }

再来看发送请求,整体实现非常简单,就是将请求入队到目标Host对应的一个队列中,后续会由Sender线程异步发送:

    // NetworkManager.java
    
    /**
     * 发送网络请求
     */
    public void sendRequest(NetworkRequest request) {
        // 1.确认这个请求要发给哪个host
        Host host = new Host(request.getHostname(), request.getIp(), request.getNioPort());
        ConcurrentLinkedQueue<NetworkRequest> requestQueue = waitingRequests.get(host);
        // 2.将请求缓存到该host对应的请求队列中
        requestQueue.offer(request);
    }

Sender线程

我们重点来看Sender线程的逻辑,它启动后就在一个循环中不断进行如下处理:

  1. 建立连接: 从Host队列中获取等待建立连接的Host,创建SocketChannel并注册到Selector上,关注OP_CONNECT事件。这样当OP_CONNECT事件发生时,就可以执行连接的建立,并更新连接状态;
  2. 派发请求: 由于NIOClient会不断创建NetworkRequest,扔到待发送的请求缓存队列中,所以Sender线程会不断从队列中出队元素,并派发到一个toSendRequests的HashMap中,表示针对该Host正要发送的请求,并监听OP_WRITE是事件;
  3. 发送请求: Sender线程会监听Selector上注册的Channel的各类事件,当出现OP_WRITE事件时,表示可以发送请求,于是向Channel写入内容,然后监听它的OP_READ事件;
  4. 读取响应: 当出现OP_WRITE事件时,Sender线程会根据不同请求类型分别对响应进行处理,并封装NetworkResponse对象。如果请求不是*“发送即丢弃”*的模式,还需要将响应扔到一个缓存中让客户端后续处理。最后,如果一个响应已经全部处理完成,需要取消对OP_READ事件关注,即不再关注响应。
    // NetworkManager.java
    
    /**
     * 网络连接核心线程(Sender线程)
     */
    class NetworkPollThread extends Thread {
        @Override
        public void run() {
            while (true) {
                // 1.对各个Host建立连接:关注OP_CONNECT
                tryConnect();
                // 2.对已建立连接的Host,关注OP_WRITE,也就是准备发送请求
                prepareRequests();
                // 3.监听Selector注册的Channel的各类事件,进行处理
                poll();
            }
        }
    
        /**
         * 尝试与排队中的HOST建立连接
         */
        private void tryConnect() {
            Host host = null;
            SocketChannel channel = null;
            // 不断出队准备建立连接的HOST
            while ((host = waitingConnectHosts.poll()) != null) {
                if (CONNECTED.equals(connectState.get(host))) {
                    continue;
                }
                // 建立SocketChannel,并注册到Selector,关注OP_CONNECT事件
                try {
                    channel = SocketChannel.open();
                    channel.configureBlocking(false);
                    channel.connect(new InetSocketAddress(host.getHostname(), host.getNioPort()));
                    channel.register(selector, SelectionKey.OP_CONNECT);
                } catch (Exception e) {
                    e.printStackTrace();
                    connectState.put(host, DISCONNECTED);
                }
            }
        }
    
        /**
         * 准备好要发送的请求
         */
        private void prepareRequests() {
            for (Host host : waitingRequests.keySet()) {
                ConcurrentLinkedQueue<NetworkRequest> requestQueue = waitingRequests.get(host);
                if (requestQueue == null || requestQueue.isEmpty()) {
                    continue;
                }
                // 对当前Host派发一个请求,准备发送
                if (!toSendRequests.containsKey(host)) {
                    NetworkRequest request = requestQueue.poll();
                    toSendRequests.put(host, request);
                    SelectionKey key = connections.get(host);
                    // 关注OP_WRITE事件,也就是发送请求
                    key.interestOps(SelectionKey.OP_WRITE);
                }
            }
        }
    
        /**
         * 完成连接建立、请求发送、响应读取
         */
        private void poll() {
            SocketChannel channel = null;
            try {
                int selectedKeys = selector.select(POLL_TIMEOUT);
                if (selectedKeys <= 0) {
                    return;
                }
    
                Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
                while (keysIterator.hasNext()) {
                    SelectionKey key = (SelectionKey) keysIterator.next();
                    keysIterator.remove();
    
                    channel = (SocketChannel) key.channel();
                    // 1.连接事件
                    if (key.isConnectable()) {
                        finishConnect(key, channel);
                    }
                    // 2.发送请求
                    else if (key.isWritable()) {
                        sendRequest(key, channel);
                    }
                    // 3.接受响应
                    else if (key.isReadable()) {
                        readResponse(key, channel);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                if (channel != null) {
                    try {
                        channel.close();
                    } catch (IOException e1) {
                        e1.printStackTrace();
                    }
                }
            }
        }
    
        /**
         * 建立连接
         */
        private void finishConnect(SelectionKey key, SocketChannel channel) {
            Host host = null;
            try {
                // 阻塞直到连接建立
                if (channel.isConnectionPending()) {
                    while (!channel.finishConnect()) {
                        Thread.sleep(100);
                    }
                }
                host = parseHost(channel);
                System.out.println("与远程服务器" + host + "成功建立连接......");
    
                waitingRequests.putIfAbsent(host, new ConcurrentLinkedQueue<NetworkRequest>());
                // 关联Host和连接
                connections.put(host, key);
                // 更新host连接状态
                connectState.put(host, CONNECTED);
            } catch (Exception e) {
                e.printStackTrace();
                if (host != null) {
                    connectState.put(host, DISCONNECTED);
                }
            }
        }
    
        /**
         * 发送请求
         */
        private void sendRequest(SelectionKey key, SocketChannel channel) {
            Host host = null;
            try {
                // 1.解析Host
                host = parseHost(channel);
    
                // 2.获取要发送到该Host的请求
                NetworkRequest request = toSendRequests.get(host);
                ByteBuffer buffer = request.getBuffer();
    
                // 3.写请求内容,while处理防止拆包
                channel.write(buffer);
                while (buffer.hasRemaining()) {
                    channel.write(buffer);
                }
                request.setSendTime(System.currentTimeMillis());
                System.out.println("本次向" + host + "机器的请求发送完毕......");
    
                // 4.关注OP_READ事件,即服务端的响应
                key.interestOps(SelectionKey.OP_READ);
            } catch (Exception e) {
                e.printStackTrace();
                // 出现异常时,取消关注OP_WRITE
                key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
    
                // 封装一个异常响应
                NetworkRequest request = toSendRequests.get(host);
                NetworkResponse response = new NetworkResponse();
                response.setRequestId(request.getId());
                response.setHostname(host.getHostname());
                response.setIp(host.getIp());
                response.setError(true);
                response.setFinished(true);
                if (request.getNeedResponse()) {
                    finishedResponses.put(request.getId(), response);
                } 
                // 发送即丢弃模式
                else {
                    if (request.getCallback() != null) {
                        request.getCallback().process(response);
                    }
                    toSendRequests.remove(host);
                }
            }
        }
    
        /**
         * 读取响应
         */
        private void readResponse(SelectionKey key, SocketChannel channel) throws Exception {
            Host host = parseHost(channel);
    
            // 1.获取当前正在发送的请求
            NetworkRequest request = toSendRequests.get(host);
    
            // 2.针对不同请求类型,解析响应内容
            NetworkResponse response = null;
            if (request.getRequestType().equals(NetworkRequest.REQUEST_SEND_FILE)) {
                response = getSendFileResponse(request.getId(), host, channel);
            } else if (request.getRequestType().equals(NetworkRequest.REQUEST_READ_FILE)) {
                response = getReadFileResponse(request.getId(), host, channel);
            }
            // 如果响应没处理完,出现了拆包,则需要继续监听该Channel的OP_READ事件
            if (Boolean.FALSE.equals(response.getFinished())) {
                return;
            }
    
            // 3.取消对OP_READ事件关注,即不再关注响应
            key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
    
            // 4.处理响应
            if (request.getNeedResponse()) {
                finishedResponses.put(request.getId(), response);
            } 
            // 发送即丢弃模式
            else {
                if (request.getCallback() != null) {
                    request.getCallback().process(response);
                }
                toSendRequests.remove(host);
            }
        }
    }

上述步骤,特别需要注意的是:对于每一个目标Host,每次只能发送一个NetworkRequst请求,这个请求会被先缓存在toSendRequests中,只有当处理完请求对应的响应(或者是发送即丢弃模式——needResponse为false,不关注响应),才能继续处理下一个请求队列waitingRequests中的NetworkRequst。

超时处理线程

最后,我们来看下超时检测线程,它的处理逻辑就很简单了,就是默认每个1s检测下各个Host的当前正在发送的请求,如果某个NetworkRequest超时了(默认每个请求的最大等待响应事件为30s),就将它移除掉,这样Sender线程就会将请求队列waitingRequests中的下一个请求缓存到toSendRequests中:

    /**
     * 超时检测线程
     */
    class RequestTimeoutCheckThread extends Thread {
        @Override
        public void run() {
            while(true) {
                try {
                    long now = System.currentTimeMillis();
                    for(NetworkRequest request : toSendRequests.values()) {
                        // 每个请求的最大等待响应事件为30s
                        if(now - request.getSendTime() > REQUEST_TIMEOUT) {
                            String hostname = request.getHostname();
                            NetworkResponse response = new NetworkResponse();
                            response.setHostname(hostname);
                            response.setIp(request.getIp());
                            response.setRequestId(request.getId());
                            response.setError(true);
                            response.setFinished(true);
    
                            if(request.getNeedResponse()) {
                                finishedResponses.put(request.getId(), response);
                            } else {
                                if(request.getCallback() != null) {
                                    request.getCallback().process(response);
                                }
                                toSendRequests.remove(hostname);
                            }
                        }
                    }
                    // 每秒检查1次
                    Thread.sleep(REQUEST_TIMEOUT_CHECK_INTERVAL);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

三、总结

本章,我对分布式文件系统客户端的长连接与异步请求机制进行了深入讲解,并给出了代码实现。基于Java NIO设计网络通信客户端时,需要考虑的点非常多,比如本章实现的长连接机制、请求排队机制、超时处理机制、回调机制、同步异步机制等等。

另外,感兴趣的读者可以深入去阅读Kafka Java客户端的源码(也可以参考我的专栏《透彻理解Kafka》),Kafka在实现客户端时还考虑了批处理机制,完全实现了一个高性能低延迟的NIO通信客户端。

阅读全文