上一章,我对BackupNode的checkpoint机制进行了深入剖析,还遗留了fsimage快照传输没有讲解。BackupNode在生成完fsimage快照文件后,需要基于NIO网络通信将fsimage文件传输给NameNode。
关于NIO的网络通信机制,我在《透彻理解Kafka》系列中深入讲解过,本节我就来仿照Kafka的通信组件实现fsimage快照传输的BackupNode客户端/NameNode服务端的代码。
一、快照传输
首先,fsimage快照传输是通过BackupNode的 FSImageUploader 组件来完成,FSImageCheckPointer在执行checkpoint机制流程的第四步会将fsimage发送给NameNode:
/**
* 生成fsimage快照文件的线程
*/
public class FSImageCheckPointer extends Thread {
/**
* 执行checkpoint机制
*/
private void doCheckpoint() throws Exception {
// 1.生成fsimage
FSImage fsimage = namesystem.getFSImage();
// 2.删除上一次的fsimage文件
removeLastFSImageFile();
// 3.保存fsimage文件到磁盘
writeFSImageFile(fsimage);
// 4.将fsimage文件发送给NameNode
uploadFSImageFile(fsimage);
// 5.将checkpoint信息发送给NameNode
updateCheckpointTxid(fsimage);
// 6.持久化checkpoint信息
saveCheckpointInfo(fsimage);
}
/**
* 发送fsimage文件
*/
private void uploadFSImageFile(FSImage fsimage) {
FSImageUploader fsimageUploader = new FSImageUploader(fsimage);
fsimageUploader.start();
}
//...
}
1.1 Client端
我们采用Java NIO实现BackupNode作为Client端的代码,核心流程就是:
- 创建一个SocketChannel,与Server端的9000端口建立连接;
- 创建一个Selector,并注册到SocketChannel中;
- 监听channel上的各类事件,比如连接建立事件、可读事件等等,并进行相应的处理。
/**
* fsimage快照传输线程
*
* @author Ressmix
*/
public class FSImageUploader extends Thread {
private FSImage fsimage;
public FSImageUploader(FSImage fsimage) {
this.fsimage = fsimage;
}
@Override
public void run() {
SocketChannel channel = null;
Selector selector = null;
try {
channel = SocketChannel.open();
channel.configureBlocking(false);
// 这里指定NameNode的地址
channel.connect(new InetSocketAddress("localhost", 9000));
selector = Selector.open();
channel.register(selector, SelectionKey.OP_CONNECT);
boolean uploading = true;
while (uploading) {
selector.select();
Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
while (keysIterator.hasNext()) {
SelectionKey key = (SelectionKey) keysIterator.next();
keysIterator.remove();
// 1.连接事件
if (key.isConnectable()) {
channel = (SocketChannel) key.channel();
if (channel.isConnectionPending()) {
channel.finishConnect();
// 上传fsimage
ByteBuffer buffer = ByteBuffer.wrap(fsimage.getFsimageJson().getBytes());
System.out.println("准备上传fsimage文件数据,大小为:" + buffer.capacity());
channel.write(buffer);
}
channel.register(selector, SelectionKey.OP_READ);
}
// 2.可读事件,即接收到了响应
else if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
channel = (SocketChannel) key.channel();
int count = channel.read(buffer);
if (count > 0) {
System.out.println("上传fsimage文件成功,响应消息为:" +
new String(buffer.array(), 0, count));
channel.close();
uploading = false;
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//...省略关闭I/O相关代码
}
}
}
可以看到,Java NIO的代码大部分都是一些 Boilerplate Code ,也就是说会遵照一定的模板,我们只需在相应的位置填充业务逻辑即可:
- 当BackupNode监听到channel上的连接事件后,就需要发送fsimage文件内容;
- 当BackupNode监听到channel上的可读事件后,就需要读取NameNode的响应。
1.2 Server端
NameNode启动后,需要基于Java NIO,作为Server端提供长连接服务,我通过组件 FSImageUploadServer 组件来实现:
/**
* NameNode启动类
*/
public class NameNode {
// fsimage同步组件
private FSImageUploadServer fsimageUploadServer;
public static void main(String[] args) throws Exception {
NameNode namenode = new NameNode();
namenode.init();
namenode.start();
}
private void init() {
this.fsimageUploadServer = new FSImageUploadServer();
}
private void start() throws Exception {
this.fsimageUploadServer.start();
}
}
FSImageUploadServer首先创建了一个ServerSocketChannel对象,绑定到本地的9000端口,然后注册Selector并监听OP_ACCEPT
事件。FSImageUploadServer线程启动后,当监听到Channel上有各类事件发生时,会调用handleRequest
方法进行处理:
/**
* fsimage server
*/
public class FSImageUploadServer extends Thread {
private Selector selector;
public FSImageUploadServer() {
this.init();
}
private void init() {
ServerSocketChannel serverSocketChannel = null;
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(9000), 100);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void run() {
System.out.println("FSImageUploadServer启动,监听9000端口......");
while (true) {
try {
selector.select();
// 监听到事件后,根据事件类型处理
Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
while (keysIterator.hasNext()) {
SelectionKey key = (SelectionKey) keysIterator.next();
keysIterator.remove();
try {
handleRequest(key);
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
}
//...
}
我们来看下FSImageUploadServer.handleRequest()
方法,主要就是针对三种不同的事件类型作相应的处理,事件的处理流转有点类似状态机:
// FSImageUploadServer.java
private void handleRequest(SelectionKey key) throws IOException {
// 1.建立连接
if (key.isAcceptable()) {
handleConnectRequest(key);
}
// 2.接收到请求
else if (key.isReadable()) {
handleReadableRequest(key);
}
// 3.发送响应
else if (key.isWritable()) {
handleWritableRequest(key);
}
}
/**
* 处理BackupNode连接请求
*/
private void handleConnectRequest(SelectionKey key) throws IOException {
SocketChannel channel = null;
try {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
channel = serverSocketChannel.accept();
if (channel != null) {
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
}
} catch (Exception e) {
e.printStackTrace();
if (channel != null) {
channel.close();
}
}
}
/**
* 处理发送fsimage文件的请求
*/
private void handleReadableRequest(SelectionKey key) throws IOException {
SocketChannel socketChannel = null;
try {
String fsimageFile = "C:\\Users\\Ressmix\\Desktop\\editslog\\fsimage.meta";
RandomAccessFile raf = null;
FileOutputStream os = null;
FileChannel fileChannel = null;
try {
socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int total = 0;
int count = -1;
if ((count = socketChannel.read(buffer)) > 0) {
File file = new File(fsimageFile);
if (file.exists()) {
file.delete();
}
raf = new RandomAccessFile(fsimageFile, "rw");
os = new FileOutputStream(raf.getFD());
fileChannel = os.getChannel();
total += count;
buffer.flip();
fileChannel.write(buffer);
buffer.clear();
} else {
socketChannel.close();
}
while ((count = socketChannel.read(buffer)) > 0) {
total += count;
buffer.flip();
fileChannel.write(buffer);
buffer.clear();
}
if (total > 0) {
System.out.println("接收fsimage文件以及写入本地磁盘完毕......");
fileChannel.force(false);
socketChannel.register(selector, SelectionKey.OP_WRITE);
}
} finally {
if (os != null) {
os.close();
}
if (raf != null) {
raf.close();
}
if (fileChannel != null) {
fileChannel.close();
}
}
} catch (Exception e) {
e.printStackTrace();
if (socketChannel != null) {
socketChannel.close();
}
}
}
/**
* 处理返回响应给BackupNode
*/
private void handleWritableRequest(SelectionKey key) throws IOException {
SocketChannel channel = null;
try {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("SUCCESS".getBytes());
buffer.flip();
channel = (SocketChannel) key.channel();
channel.write(buffer);
System.out.println("fsimage上传完毕,返回响应SUCCESS给backupnode......");
channel.register(selector, SelectionKey.OP_READ);
} catch (Exception e) {
e.printStackTrace();
if (channel != null) {
channel.close();
}
}
}
二、NameNode宕机恢复
NameServer接收到fsimage后,就可以在启动时基于fsimage快照完成元数据的恢复了。这个工作由FSNameSystem组件来完成,整个流程分为三步:
- 加载fsimage:从本地磁盘查找fsimage文件,恢复内存文件目录树;
- 加载checkpoint信息:从本地磁盘查找checkpoint文件,恢复checkpoint信息;
- 回放edits log日志:读取checkpoint之后的edits log日志,进行回放。
/**
* 负责管理元数据的核心组件
*/
public class FSNameSystem {
// 负责管理内存文件目录树的组件
private FSDirectory directory;
// 负责管理edits log写入磁盘的组件
private FSEditlog editlog;
// 最近一次checkpoint更新到的txid
private long checkpointTxid = 0L;
public FSNameSystem() {
this.directory = new FSDirectory();
this.editlog = new FSEditlog();
// 加载fsimage恢复元数据
recoverNamespace();
}
/**
* 基于fsimage快照和edits log恢复元数据
*/
public void recoverNamespace() {
try {
// 1.加载fsimage文件
loadFSImage();
// 2.加载chekpoint信息
loadCheckpointTxid();
// 3.回放edits log
loadEditLog();
} catch (Exception e) {
e.printStackTrace();
}
}
//...
}
2.1 加载fsimage文件
我们先来看加载fsimage文件的流程,就是基于Java NIO读取磁盘上的fsimage快照文件,然后反序列化生成内存文件目录树:
public class FSNameSystem {
// 负责管理内存文件目录树的组件
private FSDirectory directory;
/**
* 基于fsimage快照生成内存文件目录树
*/
private void loadFSImage() throws Exception {
FileInputStream in = null;
FileChannel channel = null;
try {
// 这里可以基于配置文件读取
String path = "C:\\Users\\Ressmix\\Desktop\\editslog\\fsimage.meta";
File file = new File(path);
if (!file.exists()) {
System.out.println("fsimage文件当前不存在,不进行恢复.......");
return;
}
in = new FileInputStream(path);
channel = in.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
int count = channel.read(buffer);
buffer.flip();
String fsimageJson = new String(buffer.array(), 0, count);
System.out.println("恢复fsimage文件中的数据:" + fsimageJson);
FSDirectory.INode dirTree = JSONObject.parseObject(fsimageJson, FSDirectory.INode.class);
// 生成目录树
directory.setDirTree(dirTree);
} finally {
if (in != null) {
in.close();
}
if (channel != null) {
channel.close();
}
}
}
//...
}
2.2 加载checkpoint信息
我们再来看加载checkpoint信息的流程,我在上一章讲过,BackupNode会通过RPC将checkpoint信息发送给NameNode。NameNode先在内存中维护checkpoint的txid信息,当停机时会将checkpoint信息持久化到磁盘上。
所以,加载checkpoint信息的流程就是读取磁盘上的checkpoint文件,然后恢复内存中的checkpoint的txid信息:
/**
* 负责管理元数据的核心组件
*/
public class FSNameSystem {
// 最近一次checkpoint对应的txid
private long checkpointTxid = 0L;
/**
* 加载checkpoint txid
*/
private void loadCheckpointTxid() throws Exception {
FileInputStream in = null;
FileChannel channel = null;
try {
String path = "C:\\Users\\Ressmix\\Desktop\\editslog\\checkpoint-txid.meta";
File file = new File(path);
if (!file.exists()) {
System.out.println("checkpoint txid文件不存在,不进行恢复.......");
return;
}
in = new FileInputStream(path);
channel = in.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = channel.read(buffer);
buffer.flip();
long checkpointTxid = Long.valueOf(new String(buffer.array(), 0, count));
System.out.println("恢复checkpoint txid:" + checkpointTxid);
// 恢复内存中的txid,也就是最近一次执行checkpoint时,保存的最新edits log的txid
this.checkpointTxid = checkpointTxid;
} finally {
if (in != null) {
in.close();
}
if (channel != null) {
channel.close();
}
}
}
//...
}
2.3 回放edits log日志
最后,NameNode还需要进行edits log日志的回放,因为fsimage中只有checkpoint时间点之前的edits log,也就是小于等于checkpointTxid的日志。所以,NameNode还需要读取磁盘中checkpointTxid之后的edits log进行回放:
/**
* 负责管理元数据的核心组件
*/
public class FSNameSystem {
// 负责管理内存文件目录树的组件
private FSDirectory directory;
// 负责管理edits log写入磁盘的组件
private FSEditlog editlog;
// 最近一次checkpoint对应的txid
private long checkpointTxid = 0L;
/**
* 加载和回放editlog
*/
private void loadEditLog() throws Exception {
// 这里可以基于配置文件读取
File dir = new File("C:\\Users\\Ressmix\\Desktop\\editslog\\");
if (!dir.exists() || dir.listFiles() == null) {
return;
}
List<File> files = new ArrayList<>();
for (File file : dir.listFiles()) {
if (file.getName().contains("edits")) {
files.add(file);
}
}
// 按edits log的文件名从小到大排序
Collections.sort(files, new Comparator<File>() {
@Override
public int compare(File o1, File o2) {
Integer o1StartTxid = Integer.valueOf(o1.getName().split("-")[1]);
Integer o2StartTxid = Integer.valueOf(o2.getName().split("-")[1]);
return o1StartTxid - o2StartTxid;
}
});
if (files == null || files.size() == 0) {
System.out.println("当前没有任何editlog文件,不进行恢复......");
return;
}
for (File file : files) {
if (file.getName().contains("edits")) {
System.out.println("准备恢复editlog文件中的数据:" + file.getName());
String[] splitedName = file.getName().split("-");
long startTxid = Long.valueOf(splitedName[1]);
long endTxid = Long.valueOf(splitedName[2].split("[.]")[0]);
// 如果是checkpointTxid之后的那些editlog都要加载出来
if (endTxid > checkpointTxid) {
String currentEditsLogFile = "C:\\Users\\Ressmix\\Desktop\\editslog\\edits-"
+ startTxid + "-" + endTxid + ".log";
List<String> editsLogs = Files.readAllLines(Paths.get(currentEditsLogFile),
StandardCharsets.UTF_8);
for (String editLogJson : editsLogs) {
JSONObject editLog = JSONObject.parseObject(editLogJson);
long txid = editLog.getLongValue("txid");
if (txid > checkpointTxid) {
System.out.println("准备回放editlog:" + editLogJson);
// 回放到内存里去
String op = editLog.getString("OP");
if (op.equals("MKDIR")) {
String path = editLog.getString("PATH");
directory.mkdir(path);
}
}
}
}
}
}
}
//...
}
三、Edits log清理
最后,我这里再补充一下NameNode的Edits log清理机制。我们思考一下,既然NameNode已经接收到了fsimage快照,那么checkpoint之前的那些edits log是不是应该都清理掉已节省磁盘空间呢?
所以,我们需要有一个后台线程 EditLogCleaner 定期做Edits log的清理工作:
public class FSNameSystem {
// 负责清理Edits log的组件
private EditLogCleaner editLogCleaner;
public FSNameSystem() {
this.directory = new FSDirectory();
this.editlog = new FSEditlog();
// 启动一个清理线程
this.editLogCleaner = new EditLogCleaner(this);
editLogCleaner.start();
recoverNamespace();
}
//...
}
3.1 EditLogCleaner
我们来看下EditLogCleaner的具体实现,思路非常简单,就是根据FSNameSystem保存的checkpoint信息,去磁盘文件上查找那些checkpoint之前的edits log,然后删除掉:
/**
* Edits Log清理线程
*/
public class EditLogCleaner extends Thread {
// 默认每隔30min清理一次
private static final Long EDIT_LOG_CLEAN_INTERVAL = 30 * 1000L;
private FSNameSystem nameSystem;
public EditLogCleaner(FSNameSystem nameSystem) {
super();
this.nameSystem = nameSystem;
}
@Override
public void run() {
System.out.println("edits log日志文件后台清理线程启动......");
while (true) {
try {
Thread.sleep(EDIT_LOG_CLEAN_INTERVAL);
List<String> flushedTxids = nameSystem.getEditsLog().getFlushedTxids();
if (flushedTxids != null && flushedTxids.size() > 0) {
// 获取最近一次checkpoint
long checkpointTxid = nameSystem.getCheckpointTxid();
for (String flushedTxid : flushedTxids) {
long startTxid = Long.valueOf(flushedTxid.split("_")[0]);
long endTxid = Long.valueOf(flushedTxid.split("_")[1]);
// 在最近一次checkpoint之前的edits log文件都要删除
if (checkpointTxid >= endTxid) {
File file = new File("C:\\Users\\Ressmix\\Desktop\\editslog\\edits-"
+ startTxid + "-" + endTxid + ".log");
if (file.exists()) {
file.delete();
System.out.println("发现editlog日志文件不需要,进行删除:" + file.getPath());
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
四、总结
本章,我对fsimage传输与NameNode的宕机恢复元数据的机制进行了讲解。fsimage快照传输我使用了Java NIO的通信机制,NIO代码本身没什么难的,主要是一些Boilerplate Code,实际工作中开发得多了也就熟悉了。
Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。
它的内容包括:
- 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
- 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
- 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
- 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
- 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
- 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
- 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
- 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw
目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:
想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询
同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。