简单介绍
Netty
里的直接缓冲区其实是用了NIO
的DirectByteBuffer
,那具体他是怎么做的呢,为什么今天来看看细节的东西,有个更好的理解,看看他是怎么申请内存,怎么释放内存的。
简单的例子
其实就这么一句简单的,就可以申请直接缓冲区,也就是堆外内存,不属于Java
管的,属于操作系统的。
ByteBuf byteBuf = Unpooled.directBuffer(1000);
经过一些列的跟踪,追踪到这里,开始调用ByteBuffer
的方法了:
DirectByteBuffer
我们来看看这个构造函数做了什么事,才好理解他是怎么申请内存的:
DirectByteBuffer(int cap) { // package-private
super(-1, 0, cap, cap);
boolean pa = VM.isDirectMemoryPageAligned();//是否页对齐
int ps = Bits.pageSize();//也大小,默认是4096字节
long size = Math.max(1L, (long)cap + (pa ? ps : 0));//对齐的话大小就有页大小+cap,即实际的申请的内存大小大于初始的容量
Bits.reserveMemory(size, cap);//尝试申请内存
long base = 0;
try {
base = UNSAFE.allocateMemory(size);//分配内存,返回基地址
} catch (OutOfMemoryError x) {
Bits.unreserveMemory(size, cap);//没内存可分配,回滚修改的内存数据
throw x;
}
UNSAFE.setMemory(base, size, (byte) 0);//设置内存里的初始值为0
if (pa && (base % ps != 0)) {//地址对齐,且基地址不是页大小的整数倍
// Round up to page boundary
address = base + ps - (base & (ps - 1));//将地址改为页大小的整数倍,即是某个页的起始地址 (base & (ps - 1))这个是base对ps取余
} else {
address = base;//地址就是基地址
}
cleaner = Cleaner.create(this, new Deallocator(base, size, cap));//创建内存回收器,传的是基地址
att = null;//没附件
}
内存页对齐
这个其实涉及到比较底层的东西,内存的分页,缓存等等。不过可以简单的理解为对齐可以提高存取效率,用空间换时间,如果你的数据放在两个不同的页里面,那他取的时候就需要取两次,如果放在同一个页中,就只需要一次,这个就是页对齐的一个好处,当然还有其他的用途,这个要去理解内存的分页机制,虚拟内存等等,我就不多说了。这里如果使用了页对齐的话,要申请的size
就会加上一个分页大小,比如4K字节。
内存可能就这么分配了:
Bits.reserveMemory
这个就是申请内存啦,首先会获取设置的直接内存分配上限MAX_MEMORY
,然后就看是否还能申请内存,其实这里还只是修改了一些记录值。如果不能,就会等待一次内存回收,如果发现有回收的,就再次看是否能申请内存,如果发现没有回收的,就调用GC
,然后执行循环,每次间隔一定时间去看等待一次内存回收,如果等9
次没有内存释放,也没有申请成功,就抛出OutOfMemoryError
异常。
static void reserveMemory(long size, int cap) {
if (!MEMORY_LIMIT_SET && VM.initLevel() >= 1) {
MAX_MEMORY = VM.maxDirectMemory();//获取分配内存上限
MEMORY_LIMIT_SET = true;
}
// optimist!看是否还能申请内存,成功就返回
if (tryReserveMemory(size, cap)) {
return;
}
final JavaLangRefAccess jlra = SharedSecrets.getJavaLangRefAccess();
boolean interrupted = false;
try {
boolean refprocActive;
do {
try {
refprocActive = jlra.waitForReferenceProcessing();//等待释放内存的处理,最终调用的是Reference的waitForReferenceProcessing
} catch (InterruptedException e) {
// Defer interrupts and keep trying.
interrupted = true;
refprocActive = true;
}
if (tryReserveMemory(size, cap)) {//再次尝试申请
return;
}
} while (refprocActive);
// trigger VM's Reference processing
System.gc();//如果没有成功,就启动gc
long sleepTime = 1;
int sleeps = 0;
while (true) {
if (tryReserveMemory(size, cap)) {
return;
}
if (sleeps >= MAX_SLEEPS) {//尝试等待9次睡眠,大约0.5秒,如果还没有内存,就退出循环
break;
}
try {
if (!jlra.waitForReferenceProcessing()) {//如果没有释放内存就sleep
Thread.sleep(sleepTime);
sleepTime <<= 1;//睡眠时间x2
sleeps++;
}
} catch (InterruptedException e) {
interrupted = true;
}
}
// no luck 没内存可申请,抛异常
throw new OutOfMemoryError("Direct buffer memory");
} finally {
if (interrupted) {
// don't swallow interrupts
Thread.currentThread().interrupt();
}
}
}
可以看下这个大致的示意图:
Bits.tryReserveMemory
其实只是做一些属性的修改,如果最大容量MAX_MEMORY
-申请的总容量totalCap
大于等于申请的容量cap
的话,就表示能申请,然后修改属性,其实实际申请的是size
,如果用上了页对齐,就会比cap
大,所以基本上实际使用的总容量是比申请的总容量大的:
//MAX_MEMORY只是限制申请的容量而不是实际的使用量,如果用了页对齐的话,实际使用量是会比申请的容量大的 即size>=cap
private static boolean tryReserveMemory(long size, int cap) {
// -XX:MaxDirectMemorySize limits the total capacity rather than the
// actual memory usage, which will differ when buffers are page
// aligned.
long totalCap;//总共申请的容量
while (cap <= MAX_MEMORY - (totalCap = TOTAL_CAPACITY.get())) {//还有能申请容量
if (TOTAL_CAPACITY.compareAndSet(totalCap, totalCap + cap)) {
RESERVED_MEMORY.addAndGet(size);//实际使用的总容量
COUNT.incrementAndGet();//申请次数增加
return true;
}
}
return false;//不能申请了
}
UNSAFE.allocateMemory
其实这个是调用了JNI
的方法:
看看本地的方法:
其实最终调用了malloc
方法申请内存。
Deallocator
这个就是释放内存的任务,实现了Runnable
接口,最后会执行run
来进行内存的释放:
private static class Deallocator
implements Runnable
{
private long address;
private long size;
private int capacity;
//传的address是基地址
private Deallocator(long address, long size, int capacity) {
assert (address != 0);
this.address = address;
this.size = size;
this.capacity = capacity;
}
public void run() {
if (address == 0) {
// Paranoia
return;
}
UNSAFE.freeMemory(address);//释放内存
address = 0;
Bits.unreserveMemory(size, capacity);
}
}
private final Cleaner cleaner;
public Cleaner cleaner() { return cleaner; }
UNSAFE.freeMemory
这个跟上面的申请类似:
本地方法:
最终也是调用了free
方法。
Cleaner
这个就是我们的清除器,是虚引用PhantomReference
类型的,先看下这个类,其实是个双向链表,主要还是clean
方法:
public class Cleaner
extends PhantomReference<Object>
{
// Dummy reference queue, needed because the PhantomReference constructor
// insists that we pass a queue. Nothing will ever be placed on this queue
// since the reference handler invokes cleaners explicitly.
// 引用队列
private static final ReferenceQueue<Object> dummyQueue = new ReferenceQueue<>();
// Doubly-linked list of live cleaners, which prevents the cleaners
// themselves from being GC'd before their referents
// 双向链表,避免自身被GC,但是只有头指针
private static Cleaner first = null;
private Cleaner
next = null,
prev = null;
//同步方法,头插法
private static synchronized Cleaner add(Cleaner cl) {
if (first != null) {
cl.next = first;
first.prev = cl;
}
first = cl;
return cl;
}
private static synchronized boolean remove(Cleaner cl) {
// If already removed, do nothing 删除了的下一个指向自己
if (cl.next == cl)
return false;
// Update list
if (first == cl) {
if (cl.next != null)
first = cl.next;//first指向下一个
else
first = cl.prev;//first=null
}
if (cl.next != null)//更新cl的前驱后继连接关系
cl.next.prev = cl.prev;
if (cl.prev != null)
cl.prev.next = cl.next;
// Indicate removal by pointing the cleaner to itself 删除的前驱和后继都指向自己
cl.next = cl;
cl.prev = cl;
return true;
}
//任务
private final Runnable thunk;
//引用对象和任务
private Cleaner(Object referent, Runnable thunk) {
super(referent, dummyQueue);
this.thunk = thunk;
}
/** 清除器,任务不能为空
* Creates a new cleaner.
*
* @param ob the referent object to be cleaned
* @param thunk
* The cleanup code to be run when the cleaner is invoked. The
* cleanup code is run directly from the reference-handler thread,
* so it should be as simple and straightforward as possible.
*
* @return The new cleaner
*/
public static Cleaner create(Object ob, Runnable thunk) {
if (thunk == null)
return null;
return add(new Cleaner(ob, thunk));//增加结点
}
/** 执行清除任务
* Runs this cleaner, if it has not been run before.
*/
public void clean() {
if (!remove(this))//已经删除过的
return;
try {
thunk.run();
} catch (final Throwable x) {
AccessController.doPrivileged(new PrivilegedAction<>() {
public Void run() {
if (System.err != null)
new Error("Cleaner terminated abnormally", x)
.printStackTrace();
System.exit(1);
return null;
}});
}
}
}
构造函数
构造函数就是接受一个引用对象和一个任务,其实这个任务就是清除任务Deallocator
,
private Cleaner(Object referent, Runnable thunk) {
super(referent, dummyQueue);
this.thunk = thunk;
}
public static Cleaner create(Object ob, Runnable thunk) {
if (thunk == null)
return null;
return add(new Cleaner(ob, thunk));//增加结点
}
clean
这个就是关键啦,也就是执行清除任务Deallocator
的run
方法,释放内存。但是这个方法什么时候被调用呢,这个就是要知道虚引用的用法了,只要引用对象被释放了,这个虚引用就会被添加到引用队列里,但是在这个之前会先放入一个pendingList
引用链表,然后引用类Reference
会有一个守护线程ReferenceHandler
会去调用processPendingReferences
方法遍历是否存在pendingList
,就有会返回,这个是本地方法做的,然后去判断具体引用类型,如果是Cleaner
类型,就会执行clean
方法,其他的就会放入引用队列,这样我们就可以获取引用队列里的元素,进行后处理了,我们来看看这个守护线程ReferenceHandler
:
其实就是无限调用外部Reference
的processPendingReferences
,里面就是真正判断类似和执行相应方法的地方啦,这里能看出来pendingList
应该是个链表,可以循环获取后续的引用:
总结
现在我们知道了netty的直接缓冲区到底是怎么分配内存和释放内存的了,释放的时候其实是用了虚引用的作用,相当于在引用对象被释放的时候会有回调,这个时候就可以做一些释放内存的事了。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。