2023-09-13
原文作者:https://blog.csdn.net/wangwei19871103/category_9681495_2.html 原文地址: https://blog.csdn.net/wangwei19871103/article/details/104087624

windows下的非阻塞设置

在NIO里面,我们设置一般都会用非阻塞,也就是这样设置erverSocketChannel.configureBlocking(false);.,具体他做了什么呢,我们一起来看看,最终可以跟到一个JNI的本地方法:

      public static native void configureBlocking(FileDescriptor fd,
                                                    boolean blocking)
            throws IOException;

我们可以在JDK的源码里找到IOUtil.c,里面的方法就根据传入的参数blocking设置了相应的值:

202309132206335201.png
主要是这个函数ioctlsocket,具体可以看这篇文章

windows下的select

最后跟进去是也是个JNI的本地方法,还不是真正的windows下的select

     private native int poll0(long pollAddress, int numfds,
                 int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
    
    pollAddress表示文件描述符数组的首地址
    numfds表示有多少个文件描述符要监听
    readFds表示监听返回的读事件的文件描述符数组
    writeFds表示监听返回的写事件的文件描述符数组
    exceptFds表示监听返回的异常事件的文件描述符数组
    timeout表示超时时间

这个方法跟我上一篇讲Linuxselect差不多,也是把要监听的读写异常文件描述符数组传给操作系统调用,然后操作系统再去进行设置,最后返回有事件的个数,然后在从这些数组里面轮询判断到底是哪个文件描述符的哪个事件。底层调用的是WindowsSelectorImpl.cJava_sun_nio_ch_WindowsSelectorImpl_00024SubSelector_poll0方法:

    #define FD_SETSIZE 1024
    
    typedef struct {
        jint fd;//文件描述符
        jshort events;//监听事件
    } pollfd;
    
    
    typedef struct {
      u_int    fd_count;                 // 多少个文件描述符
      SOCKET   fd_array[FD_SETSIZE];     // 文件描述符数组
    } FD_SET;
    
    
    
    JNIEXPORT jint JNICALL
    Java_sun_nio_ch_WindowsSelectorImpl_00024SubSelector_poll0(JNIEnv *env, jobject this,
                                       jlong pollAddress, jint numfds,
                                       jintArray returnReadFds, jintArray returnWriteFds,
                                       jintArray returnExceptFds, jlong timeout)
    {
        DWORD result = 0;
        pollfd *fds = (pollfd *) pollAddress;//文件描述符数组的的首地址
        
        int i;
        FD_SET readfds, writefds, exceptfds;//三种事件的集合
        struct timeval timevalue, *tv;
        static struct timeval zerotime = {0, 0};
        int read_count = 0, write_count = 0, except_count = 0;
    
    #ifdef _WIN64
        int resultbuf[FD_SETSIZE + 1];//结果数组1025个,第一个是数量,作为后面数据的缓存
    #endif
    	//超时处理
        if (timeout == 0) {
            tv = &zerotime;
        } else if (timeout < 0) {
            tv = NULL;
        } else {
            jlong sec = timeout / 1000;
            tv = &timevalue;
            //
            // struct timeval members are signed 32-bit integers so the
            // signed 64-bit jlong needs to be clamped
            //
            if (sec > INT_MAX) {
                tv->tv_sec  = INT_MAX;
                tv->tv_usec = 0;
            } else {
                tv->tv_sec  = (long)sec;
                tv->tv_usec = (long)((timeout % 1000) * 1000);
            }
        }
    
        /* Set FD_SET structures required for select */
        for (i = 0; i < numfds; i++) {
            if (fds[i].events & POLLIN) {//设置读事件
               readfds.fd_array[read_count] = fds[i].fd;
               read_count++;
            }
            if (fds[i].events & (POLLOUT | POLLCONN))//设置写事件
            {
               writefds.fd_array[write_count] = fds[i].fd;
               write_count++;
            }
            exceptfds.fd_array[except_count] = fds[i].fd;//剩下的设置异常事件
            except_count++;
        }
    
        readfds.fd_count = read_count;
        writefds.fd_count = write_count;
        exceptfds.fd_count = except_count;
    
        /* Call select */ //如果出错的话
        if ((result = select(0 , &readfds, &writefds, &exceptfds, tv))
                                                                 == SOCKET_ERROR) {
            /* Bad error - this should not happen frequently */
            /* Iterate over sockets and call select() on each separately */
            FD_SET errreadfds, errwritefds, errexceptfds;
            readfds.fd_count = 0;
            writefds.fd_count = 0;
            exceptfds.fd_count = 0;
            for (i = 0; i < numfds; i++) {
                /* prepare select structures for the i-th socket */
                errreadfds.fd_count = 0;
                errwritefds.fd_count = 0;
                if (fds[i].events & POLLIN) {
                   errreadfds.fd_array[0] = fds[i].fd;
                   errreadfds.fd_count = 1;
                }
                if (fds[i].events & (POLLOUT | POLLCONN))
                {
                    errwritefds.fd_array[0] = fds[i].fd;
                    errwritefds.fd_count = 1;
                }
                errexceptfds.fd_array[0] = fds[i].fd;
                errexceptfds.fd_count = 1;
    
                /* call select on the i-th socket 每一个都会去执行select*/
                if (select(0, &errreadfds, &errwritefds, &errexceptfds, &zerotime)
                                                                 == SOCKET_ERROR) {
                    /* This socket causes an error. Add it to exceptfds set */
                    exceptfds.fd_array[exceptfds.fd_count] = fds[i].fd;
                    exceptfds.fd_count++;
                } else {//处理没有报错
                    /* This socket does not cause an error. Process result */
                    if (errreadfds.fd_count == 1) {
                        readfds.fd_array[readfds.fd_count] = fds[i].fd;
                        readfds.fd_count++;
                    }
                    if (errwritefds.fd_count == 1) {
                        writefds.fd_array[writefds.fd_count] = fds[i].fd;
                        writefds.fd_count++;
                    }
                    if (errexceptfds.fd_count == 1) {
                        exceptfds.fd_array[exceptfds.fd_count] = fds[i].fd;
                        exceptfds.fd_count++;
                    }
                }
            }
        }
    
        /* Return selected sockets. */
        /* Each Java array consists of sockets count followed by sockets list */
    //设置到返回的数组中
    #ifdef _WIN64 
        resultbuf[0] = readfds.fd_count;//64位的第一个是放数量
        for (i = 0; i < (int)readfds.fd_count; i++) {
            resultbuf[i + 1] = (int)readfds.fd_array[i];
        }
        (*env)->SetIntArrayRegion(env, returnReadFds, 0,
                                  readfds.fd_count + 1, resultbuf);
    
        resultbuf[0] = writefds.fd_count;
        for (i = 0; i < (int)writefds.fd_count; i++) {
            resultbuf[i + 1] = (int)writefds.fd_array[i];
        }
        (*env)->SetIntArrayRegion(env, returnWriteFds, 0,
                                  writefds.fd_count + 1, resultbuf);
    
        resultbuf[0] = exceptfds.fd_count;
        for (i = 0; i < (int)exceptfds.fd_count; i++) {
            resultbuf[i + 1] = (int)exceptfds.fd_array[i];
        }
        (*env)->SetIntArrayRegion(env, returnExceptFds, 0,
                                  exceptfds.fd_count + 1, resultbuf);
    #else
        (*env)->SetIntArrayRegion(env, returnReadFds, 0,
                                  readfds.fd_count + 1, (jint *)&readfds);
    
        (*env)->SetIntArrayRegion(env, returnWriteFds, 0,
                                  writefds.fd_count + 1, (jint *)&writefds);
        (*env)->SetIntArrayRegion(env, returnExceptFds, 0,
                                  exceptfds.fd_count + 1, (jint *)&exceptfds);
    #endif
        return 0;
    }

真正的windows下的select就是select(0 , &readfds, &writefds, &exceptfds, tv),跟Linux的很像吧。具体可以看看这篇文章

上面JNI的看起来好像一大堆,其实他做的事情跟我们前面讲的Linux类似,只是加了JNI的操作。首先我们在Java中调用了poll0,就是最重的JNI方法:

202309132206344842.png
传入的就是文件描述符的首地址,然后是数量,要返回的三个集合,超时时间,具体的Java代码我后面会分析,我先贴出来一部分的。
pollArrayAddress可以理解为本地地址:

202309132206352553.png

202309132206360404.png

202309132206368675.png
三个集合:

202309132206376516.png
对应的传入本地的参数就是红框中的:

202309132206382557.png
然后转换成数组首地址,声明集合:

202309132206390918.png
进行集合的初始化:

202309132206401419.png
后面如果有报错的话会针对每一个文件描述符去进行监听,然后设置相应的集合:

2023091322064132510.png
否则就直接设置返回的事件到对应的数组给Java层,如果有宏定义_WIN64的则第一个是总数量,后面才是文件描述符,否则即直接把集合转换成jint类型,其实第一个也是数量,因为前面集合FD_SET的结构体定义里有fd_count属性:

2023091322064257911.png

pollAddress

上面有讲过这个,但是其实不太好理解,这个东西到底是什么,我们来看看源码,开始是在创建选择器实现类的时候会创建PollArrayWrapper

2023091322064343412.png
然后创建本地对象AllocatedNativeObject,也就是内存是本地的,不是Java堆的,Java只拿到本地返回的内存地址:

2023091322064537913.png
最后是调用这个,会进行内存页对齐,总是页大小4K的倍数,反正你知道最终可以操作本地内存就行:

2023091322064603714.png
然后向底层申请内存:

2023091322064675215.png
最后JNI方法:

2023091322064757916.png
其实本地是调用了unsafe.cpp的方法:

2023091322064822117.png

2023091322064883018.png
其实就是分配内存空间,然后返回内存地址。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

Liuux下的Epoll

如果是Linux的JDK,我们看看到EPoll.java中的一些常量和三个核心方法:

        /**
         * typedef union epoll_data {
         *     void *ptr;
         *     int fd;
         *     __uint32_t u32;
         *     __uint64_t u64;
         *  } epoll_data_t;
         *
         * struct epoll_event {
         *     __uint32_t events;
         *     epoll_data_t data;
         * }
         */
        private static final int SIZEOF_EPOLLEVENT   = eventSize();
        private static final int OFFSETOF_EVENTS     = eventsOffset();
        private static final int OFFSETOF_FD         = dataOffset();
    
        // opcodes
        static final int EPOLL_CTL_ADD  = 1;
        static final int EPOLL_CTL_DEL  = 2;
        static final int EPOLL_CTL_MOD  = 3;
    
        // events
        static final int EPOLLIN   = 0x1;
        static final int EPOLLOUT  = 0x4;
    
        // flags
        static final int EPOLLONESHOT   = (1 << 30);
    
    	static native int create() throws IOException;
    	
    	static native int ctl(int epfd, int opcode, int fd, int events);
    	
    	static native int wait(int epfd, long pollAddress, int numfds, int timeout)

这个跟我前面一篇的讲的是对应的,所以就不多说了,我们看看JNI里怎么实现的:

2023091322064957619.png
这个就更加好理解了,也不多说了。

总结

这里只是介绍了一些跟前面一篇相关的东西,了解下JNI底层跟操作系统如何打交道,具体的所有操作都是由系统调用来实现的,Java只是做了个封装,通过这两篇的讲解,能知道多路复用到底是怎么回事,事件又是怎么传到Java层的,为什么大多情况下select没有epoll性能好,当然其实讲的也比较浅,真的要弄明白底层,可能要深入Linux内核源码啦,有兴趣的可以研究下。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文