2024-04-05  阅读(2)
原文作者:文先生的博客 原文地址: http://wenfh2020.com/2020/04/13/redis-multithreading-mode/

Redis 6.0 版本增加了多线程并发处理网络 IO 功能,主要是为了利用多核资源,减轻主线程负载,提高程序整体性能。



1. 架构

1.1. 整体框架

Redis 是 多进程 + 多线程 混合并发模型。

  • 子进程持久化:重写 aof 文件 / 保存 rdb 文件。
  • 多线程:主线程 + 后台线程 + 新增网络 IO 线程(Redis 6.0)。

详细参考:《[Redis] 浅析 Redis 并发模型

202404052230159661.png


1.2. 网络 IO 框架

1.2.1. 主线程异步网络 IO

下图描述了 Redis 客户端与服务端主线程异步通信流程,有兴趣的朋友可以参考:《[redis 源码走读] 异步通信流程-单线程》,这里不详细展开了。

202404052230164952.png


1.2.2. 多线程网络 IO 框架

Redis 6.0 以前,主线程处理网络 IO;Redis 6.0 增加了多线程处理网络 IO 功能,详见下图。

  • 如果没开启多线程,那么 Redis 只会使用主线程处理网络 IO,主线程单线程处理网络 IO 是串行的。
  • 为了保证主逻辑处理方式整体不变,多线程 IO 工作方式,不允许同时并发读写操作,同一时刻只允许读或只允许写。
  • 如果开启了多线程,而且等待处理的 client 数量很少,新增的网络 IO 线程会被挂起,仍然使用主线程工作;否则启用多线程工作,将等待的 clients,平均分配给多个线程(主线程+新增线程)并行处理。
  • 任务分配完以后,主线程将处理自己的任务,并等待新增线程都处理完任务后,才会执行下一个步骤的其它操作,这样做的目的是为了保证整体逻辑串行;不因为引入多线程处理方式改变了原来的主逻辑,尽力将多线程并行逻辑的影响减少到最小。

202404052230207943.png


2. 配置

io-threads 线程配置,redis.conf 配置文件默认是不开放的,默认只有一个线程在工作,这个线程就是主线程

  • io-threads 4,如果开放多线程配置,那么 IO 处理线程默认共有 4 个,包括主线程。也就是说,新增的 IO 线程有 3 个。
  • io-threads-do-reads no,是否开启读操作多线程模式;因为 Redis 作为缓存服务,读入数据比较小,写出数据比较多,所以读操作非必要不需要开启多线程模式。
 
    # redis.conf
    
    # 配置多线程处理线程个数,默认 4。
    # io-threads 4
    #
    # 多线程是否处理读事件,默认关闭。
    # io-threads-do-reads no

3. 源码剖析

网络读写操作大同小异,下面根据源码剖析写操作。

3.1. 主线程

  • 根据需要网络 IO 的 clients 数量决定是否需要启动多线程模式。
  • 如果开启了多线程模式,将需要 IO 的 clients 分配给多个线程进行工作。
  • 主线程对分配给自己的 clients 进行对应的读写任务。
  • 等待其它子线程都完成任务后,再进行其它操作。
 
    int handleClientsWithPendingWritesUsingThreads(void) {
        int processed = listLength(server.clients_pending_write);
        if (processed == 0) return 0;
    
        // 如果 client 很少,关闭多线程模式,用主线程处理写操作。
        if (stopThreadedIOIfNeeded()) {
            // 主线程处理写操作。
            return handleClientsWithPendingWrites();
        }
    
        if (!io_threads_active) startThreadedIO();
    
        // 主线程分配任务,将 client 按取模的方式分配给各个线程。
        listIter li;
        listNode *ln;
        listRewind(server.clients_pending_write,&li);
        int item_id = 0;
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            c->flags &= ~CLIENT_PENDING_WRITE;
            int target_id = item_id % server.io_threads_num;
            listAddNodeTail(io_threads_list[target_id],c);
            item_id++;
        }
    
        // 标识写操作。
        io_threads_op = IO_THREADS_OP_WRITE;
    
        // 设置 io_threads_pending 数据,
        // 后面根据这个数据确定子线程是否已完成任务。
        for (int j = 1; j < server.io_threads_num; j++) {
            int count = listLength(io_threads_list[j]);
            io_threads_pending[j] = count;
        }
    
        // 主线程处理第一个队列。
        listRewind(io_threads_list[0],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            // 写数据,发送给回复给客户端。
            writeToClient(c,0);
        }
        listEmpty(io_threads_list[0]);
    
        // 等待所有子线程完成任务。
        while(1) {
            unsigned long pending = 0;
            for (int j = 1; j < server.io_threads_num; j++)
                pending += io_threads_pending[j];
            if (pending == 0) break;
        }
    
        listRewind(server.clients_pending_write,&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
    
            // 如果有的 client 数据还没发送完(异步),那么注册写事件,下次再触发发送。
            if (clientHasPendingReplies(c)
                && connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR) {
                freeClientAsync(c);
            }
        }
        listEmpty(server.clients_pending_write);
        return processed;
    }

3.2. 子线程

  • 主线程已分配给子线程 clients。
  • 子线程遍历 clients 执行对应的读写操作。
 
    void *IOThreadMain(void *myid) {
        long id = (unsigned long)myid;
    
        while(1) {
            ...
            // 根据操作类型,处理对应的读/写逻辑。
            listIter li;
            listNode *ln;
            listRewind(io_threads_list[id],&li);
            while((ln = listNext(&li))) {
                client *c = listNodeValue(ln);
                if (io_threads_op == IO_THREADS_OP_WRITE) {
                    writeToClient(c,0);
                } else if (io_threads_op == IO_THREADS_OP_READ) {
                    readQueryFromClient(c->conn);
                }
                ...
            }
            // 已完成任务,清空数据。
            listEmpty(io_threads_list[id]);
            io_threads_pending[id] = 0;
        }
    }

4. 测试

  • 压测设备:8 核心,16G 内存。
  • 配置。多线程模式测试,开启读写两个选项;单线程模式测试则会关闭。
 
    # redis.conf
    io-threads 4
    io-threads-do-reads yes
  • 压测命令。

命令逻辑已整理成脚本,放到 github,顺手录制了测试视频:压力测试 redis 多线程处理网络 I/O

 
    # 压测工具会模拟多个终端,防止超出限制,被停止。
    ulimit -n 16384
    
    # 可以设置对应的链接数/包体大小进行测试。
    ./redis-benchmark -c xxxx -r 1000000 -n 100000 -t set,get -q --threads 2  -d yyyy
  • 压测结果: 多线程没有单线程好 ^_^!。可能测试数据 key-value 中的 value 体量太小了未能看见应有的结果,所以我们要根据自身业务场景开启多线程网络 IO 功能。

202404052230216334.png


5. 参考


Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文