本章,我将对DataNode的另一类高可用问题进行讲解,即DFS客户端在文件传输过程中,如果出现中断或异常该如何处理?
DFS客户端与DataNode在文件传输过程中,发生中断异常。此时,DFS客户端需要向NameNode上报异常节点的信息,然后NameNode再选择可用的DataNode让DFS重新传输。
一、文件下载重传输
客户端对传输中断异常的处理主要分为两种情况: 文件下载重传输 和 文件上传重传输 ,我们先来看下载重传输的情况。DFS客户端下载文件出现中断时,需要做两件事情:
- 告诉NameNode哪个DataNode出现了问题;
- NameNode重新选择一个可用DataNode返回给客户端,客户端重新下载。
1.1 客户端侧
我们需要改写DFS客户端侧的获取下载DataNode的RPC接口——getDataNodeForFile
,在请求中加上一个excludedDataNodeId
字段,表示当前故障的DataNode节点:
message GetDataNodeForFileRequest{
string filename = 1;
string excludedDataNodeId=2;
}
// NameNodeRpcClient.java
/**
* 发送请求获取指定文件所在的随机DataNode节点
*/
public String getDataNodeForFile(String filename,String excludedDataNode) {
GetDataNodeForFileRequest request = GetDataNodeForFileRequest.newBuilder()
.setFilename(filename)
.setExcludedDataNodeId(excludedDataNode) // 异常DataNode
.build();
GetDataNodeForFileResponse response = namenode.getDataNodeForFile(request);
return response.getDatanode();
}
接着,DFS客户端在下载文件时,需要对中断异常做一些处理,即向新可用的DataNode进行文件传输:
// FileSystemImpl.java
public byte[] download(String filename) throws Exception {
// 1.获取待下载文件对应的可用DataNode节点
String datanode = rpcClient.getDataNodeForFile(filename, "");
System.out.println("NameNode分配用来下载文件的数据节点:" + datanode);
// 2.解析DataNode信息
JSONObject jsonObject = JSONObject.parseObject(datanode);
String hostname = jsonObject.getString("hostname");
String ip = jsonObject.getString("ip");
Integer nioPort = jsonObject.getInteger("nioPort");
// 3.基于Java NIO下载文件
byte[] file = null;
try {
file = DFSNIOClient.readFile(hostname, nioPort, filename);
} catch (Exception ex) {
// 出现异常,重新获取一个可用DataNode,上送异常的DataNode信息
datanode = rpcClient.getDataNodeForFile(filename, ip + "-" + hostname);
jsonObject = JSONObject.parseObject(datanode);
hostname = jsonObject.getString("hostname");
nioPort = jsonObject.getInteger("nioPort");
try {
file = DFSNIOClient.readFile(hostname, nioPort, filename);
} catch (Exception e2) {
throw e2;
}
}
return file;
}
1.2 NameNode侧
同时,我们需要对NameNode侧做一些改造,当RPC接口getDataNodeForFile
上送了异常DataNode信息时,需要剔除该节点:
// NameNodeServiceImpl.java
/**
* 下载文件
*/
@Override
public void getDataNodeForFile(GetDataNodeForFileRequest request, StreamObserver<GetDataNodeForFileResponse> responseObserver) {
String filename = request.getFilename();
// 异常DataNode
String excludedDataNode = request.getExcludedDataNodeId();
DataNodeInfo datanode = datanodeManager.getDataNodeForFile(filename,excludedDataNode);
GetDataNodeForFileResponse response = GetDataNodeForFileResponse.newBuilder()
.setDatanode(JSONObject.toJSONString(datanode))
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
选择一个正常的DataNode节点:
// DataNodeManager.java
/**
* 获取可供下载的DataNode节点
*/
public DataNodeInfo getDataNodeForFile(String filename, String excludedDataNode) {
try {
rrw.readLock().lock();
// 需要排除的节点
DataNodeInfo excluded = datanodes.get(excludedDataNode);
List<DataNodeInfo> datanodes = datanodeMappedByFile.get(filename);
if (datanodes.size() == 1 && (datanodes.get(0).equals(excluded))) {
return null;
}
int size = datanodes.size();
// 随机选择一个非异常节点
Random random = new Random(System.currentTimeMillis());
while (true) {
int index = random.nextInt(size);
DataNodeInfo datanode = datanodes.get(index);
if (!datanode.equals(excluded)) {
return datanode;
}
}
} finally {
rrw.readLock().lock();
}
}
二、文件上传重传输
文件上传中断的异常处理流程是类似的,DFS客户端上传文件出现中断时,需要做两件事情:
- 告诉NameNode哪个DataNode出现了问题;
- NameNode重新选择一个可用DataNode返回给客户端,客户端重新上传。
2.1 客户端侧
首先,我们需要改写DFS客户端侧获取上传DataNode的RPC接口——allocateDataNodes
,在请求中加上一个excludedDataNodeId
字段,表示当前故障的DataNode节点:
message AllocateDataNodesRequest{
string filename = 1;
int64 filesize = 2;
string excludedDataNodeId=3;
}
// NameNodeRpcClient.java
/**
* 分配双副本DataNode数据节点
*/
public String allocateDataNodes(String filename, long fileSize,String excludedDataNodeId) {
AllocateDataNodesRequest request = AllocateDataNodesRequest.newBuilder()
.setFilename(filename)
.setFilesize(fileSize)
.setExcludedDataNodeId(excludedDataNodeId)
.build();
AllocateDataNodesResponse response = namenode.allocateDataNodes(request);
return response.getStatus() == 1 ? response.getDatanodes() : null;
}
接着,DFS客户端在上传文件时,需要对中断异常做一些处理,即上传异常节点信息,然后重新上传:
public Boolean upload(byte[] file, String filename, long fileSize) throws Exception {
// 1.RPC接口发送文件元数据
if (!filename.startsWith(File.separator)) {
filename = File.separator + filename;
}
if (!rpcClient.createFile(filename)) {
return false;
}
// 2.RPC接口获取DataNode
String datanodesJson = rpcClient.allocateDataNodes(filename, fileSize, "");
System.out.println(datanodesJson);
if (datanodesJson == null) {
return false;
}
// 3.遍历DataNode,依次上传文件
JSONArray datanodes = JSONArray.parseArray(datanodesJson);
for (int i = 0; i < datanodes.size(); i++) {
JSONObject datanode = datanodes.getJSONObject(i);
String hostname = datanode.getString("hostname");
String ip = datanode.getString("ip");
int nioPort = datanode.getIntValue("nioPort");
try {
DFSNIOClient.sendFile(hostname, nioPort, file, filename, fileSize);
} catch (Exception ex) {
ex.printStackTrace();
// 出现异常时,上送异常DataNode信息,并重新获取一个正常的DataNode
String reallocateDataNode= rpcClient.allocateDataNodes(filename, fileSize, ip + "-" + hostname);
datanode = JSONObject.parseObject(reallocateDataNode);
hostname = datanode.getString("hostname");
nioPort = datanode.getIntValue("nioPort");
DFSNIOClient.sendFile(hostname, nioPort, file, filename, fileSize);
}
}
return true;
}
2.2 NameNode侧
最后,需要对NameNode侧做一些改造,当RPC接口allocateDataNodes
上送了异常DataNode信息时,需要剔除该节点:
// NameNodeServiceImpl.java
/**
* 获取可供上传的DataNode节点及副本
*/
@Override
public void allocateDataNodes(AllocateDataNodesRequest request, StreamObserver<AllocateDataNodesResponse> responseObserver) {
try {
AllocateDataNodesResponse response = null;
if (!isRunning) {
response = AllocateDataNodesResponse.newBuilder().setStatus(STATUS_SHUTDOWN).build();
} else {
long fileSize = request.getFilesize();
// 需要剔除的异常DataNode
String excludedDataNodeId= request.getExcludedDataNodeId();
List<DataNodeInfo> datanodes = datanodeManager.allocateDataNodes(fileSize,excludedDataNodeId);
String datanodesJson = JSONArray.toJSONString(datanodes);
response = AllocateDataNodesResponse.newBuilder().setStatus(STATUS_SUCCESS).setDatanodes(datanodesJson).build();
}
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Exception e) {
e.printStackTrace();
}
}
选择一个正常的DataNode节点:
// DataNodeManager.java
/**
* 获取可供上传的DataNode节点及副本
*/
public List<DataNodeInfo> allocateDataNodes(long fileSize, String excludedDataNodeId) {
synchronized (this) {
DataNodeInfo excludedDataNode = datanodes.get(excludedDataNodeId);
List<DataNodeInfo> datanodeList = new ArrayList<>();
for (DataNodeInfo datanode : datanodes.values()) {
if (!datanode.equals(excludedDataNode)) {
datanodeList.add(datanode);
}
}
Collections.sort(datanodeList);
// 选择存储数据最少的头两个datanode出来
List<DataNodeInfo> selectedDatanodes = new ArrayList<>();
if (datanodeList.size() >= 2) {
selectedDatanodes.add(datanodeList.get(0));
selectedDatanodes.add(datanodeList.get(1));
// 增加节点存储数据的大小
datanodeList.get(0).addStoredDataSize(fileSize);
datanodeList.get(1).addStoredDataSize(fileSize);
} else if (datanodeList.size() == 1) {
selectedDatanodes.add(datanodeList.get(0));
}
return selectedDatanodes;
}
}
三、总结
本章,我对DataNode高可用的另一类异常——文件传输中断问题进行了讲解,如果DFS客户端在文件传输过程中,DataNode出现了异常,那么应当重新向NameNode申请可用DataNode,并重新进行文件上传或下载。整个处理思路是比较简单的,主要是异常状况下上送异常DataNode的信息,以及NameNode对正常DataNode的选取。
Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。
它的内容包括:
- 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
- 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
- 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
- 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
- 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
- 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
- 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
- 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw
目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:
想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询
同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。