2024-04-05
原文作者:文先生的博客 原文地址:http://wenfh2020.com/2020/04/06/ae-timer/

定时器是 redis 异步处理事件的一个十分重要的功能。

redis 定时器功能由多个时间事件组成,事件由一个双向链表维护。

时间事件可以处理多个定时任务。


理解 redis 定时器,我们带着问题,看看 redis 是怎么处理的:

  • 定时器作用是什么。
  • 定时器实现原理。
  • 单进程里如何同时处理文件事件和时间事件。
  • 如何实现多定时任务。

1. 作用

redis 定时器核心逻辑在 serverCron 函数里。

  • 对设置了过期时间的数据进行检查回收。
  • 异步回收需要关闭的链接(socket)。
  • 检查 fork 的子进程是否已经关闭,处理回收的相关工作。
  • 检查内存数据是否符合 rdb 持久化快照落地条件,fork 子进程进行快照保存。
  • bgsave rdb 生成快照或 bgrewriteaof aof 重写延后操作。
  • 对需要扩容和缩容的哈希表(dict)进行数据迁移。
  • 集群里节点间的断线重连。
  • redis 服务的一些信息统计。

2. 定时器实现原理

redis 定时器功能由多个时间事件组成,事件由一个双向链表维护。

2.1. 事件结构

  • 事件。
 
    // 时间事件
    typedef struct aeTimeEvent {
        long long id; /* time event identifier. */
        long when_sec; /* seconds */
        long when_ms; /* milliseconds */
        aeTimeProc *timeProc; /* 时钟到期事件触发回调处理函数。*/
        aeEventFinalizerProc *finalizerProc; /* 时间事件删除时,触发回调。*/
        void *clientData; /* 扩展参数,异步操作方便数据回调,在 timeProc 通过参数回传。*/
        struct aeTimeEvent *prev; /* 前一个节点。*/
        struct aeTimeEvent *next; /* 后一个节点。*/
    } aeTimeEvent;
    
    // 事件管理
    typedef struct aeEventLoop {
        ...
        long long timeEventNextId;  // 时间事件下一个 id (通过 ‘++’ 递增)
        ...
        aeTimeEvent *timeEventHead; // 时间事件链表。
        ...
    } aeEventLoop;
  • 事件循环。
 
    // 循环处理事件。
    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        while (!eventLoop->stop) {
            if (eventLoop->beforesleep != NULL)
                eventLoop->beforesleep(eventLoop);
            aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
        }
    }
  • 事件回调函数。
 
    // 时间事件触发处理函数。
    typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
    
    // 时间事件处理完毕,被删除时,触发的回调处理。
    typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
  • 创建时间事件,时间事件通过双向链表管理,新的事件插入到链表头。
 
    long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
            aeTimeProc *proc, void *clientData,
            aeEventFinalizerProc *finalizerProc) {
        // 事件 id 递增。
        long long id = eventLoop->timeEventNextId++;
        aeTimeEvent *te;
    
        te = zmalloc(sizeof(*te));
        if (te == NULL) return AE_ERR;
        te->id = id;
        // 设置到期时间。
        aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
        te->timeProc = proc;
        te->finalizerProc = finalizerProc;
        te->clientData = clientData;
        te->prev = NULL;
        te->next = eventLoop->timeEventHead;
        if (te->next)
            te->next->prev = te;
        eventLoop->timeEventHead = te;
        return id;
    }
  • 设置事件到期时间。
 
    static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
        long cur_sec, cur_ms, when_sec, when_ms;
    
        // 当前时间增加到期时间间隔。
        aeGetTime(&cur_sec, &cur_ms);
        when_sec = cur_sec + milliseconds/1000;
        when_ms = cur_ms + milliseconds%1000;
        if (when_ms >= 1000) {
            when_sec ++;
            when_ms -= 1000;
        }
        *sec = when_sec;
        *ms = when_ms;
    }

2.2. 定时器执行流程

  • redis 启动添加时钟处理事件。
 
    // ae.c
    int main(int argc, char **argv) {
        ...
        initServer();
        ...
        aeMain(server.el);
        ...
    }
    
    void initServer(void) {
        ...
        // 创建定时事件,绑定回调函数。
        if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
            serverPanic("Can't create event loop timers.");
            exit(1);
        }
        ...
    }
    
    // 时钟回调处理函数
    int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
        ...
    }
  • 事件循环处理。
 
    // 循环处理事件。
    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        while (!eventLoop->stop) {
            if (eventLoop->beforesleep != NULL)
                eventLoop->beforesleep(eventLoop);
            aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
        }
    }
    
    // 处理时间事件
    int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
        ...
         /* Check time events */
        if (flags & AE_TIME_EVENTS)
            // 处理时间事件
            processed += processTimeEvents(eventLoop);
        ...
    }

3. 单进程异步处理事件逻辑

进程通过循环,不停地处理时间事件和文件事件。

202404052230103731.png

  • 在进程的循环中不停地捞出文件和时间事件进行处理 aeProcessEvents
 
    // ae.c
    int main(int argc, char **argv) {
        ...
        aeMain(server.el);
        ...
    }
    
    // 循环处理事件。
    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        while (!eventLoop->stop) {
            ...
            aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
        }
    }
    
    // 处理事件
    int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
        ...
        // 先搜索出最快到期的定时器,查看时间戳,文件事件要在定时器到期前从系统内核捞出来处理。
        shortest = aeSearchNearestTimer(eventLoop);
        ...
        // 处理文件事件,等待获取事件时间间隔不能太长,否则定时器事件处理要超时了。
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            ...
        }
        ...
        // 处理时间事件
        if (flags & AE_TIME_EVENTS)
            processed += processTimeEvents(eventLoop);
        ...
    }
  • 对于文件事件,在 linux 系统中,redis 采用了 epoll 多路复用 I/O 事件驱动处理文件事件。通过 epoll_wait 捞出就绪事件进行处理。
 
    static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
        ...
        retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
                tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
        ...
    }
  • 先处理完文件事件,再处理时间事件。
 
    static int processTimeEvents(aeEventLoop *eventLoop) {
        ...
    }

4. 多定时任务

我们看看 processTimeEvents 是如何处理多个定时任务的:

202404052230109782.png

  • 遍历时间事件链表,先删除已处理,被标识需要删除的事件,再执行到期事件。
 
    static int processTimeEvents(aeEventLoop *eventLoop) {
        int processed = 0;
        aeTimeEvent *te;
        long long maxId;
        time_t now = time(NULL);
        ...
        eventLoop->lastTime = now;
    
        te = eventLoop->timeEventHead;
        maxId = eventLoop->timeEventNextId-1;
        // 遍历时间链表,处理到期执行的时间事件。
        while(te) {
            long now_sec, now_ms;
            long long id;
    
            // 从时间事件链表中,删除时被标识为删除状态的事件。
            if (te->id == AE_DELETED_EVENT_ID) {
                aeTimeEvent *next = te->next;
                if (te->prev)
                    te->prev->next = te->next;
                else
                    eventLoop->timeEventHead = te->next;
                if (te->next)
                    te->next->prev = te->prev;
                if (te->finalizerProc)
                    te->finalizerProc(eventLoop, te->clientData);
                zfree(te);
                te = next;
                continue;
            }
            ...
            // 时间事件到期,执行事件。
            aeGetTime(&now_sec, &now_ms);
            if (now_sec > te->when_sec ||
                (now_sec == te->when_sec && now_ms >= te->when_ms)) {
                int retval;
    
                id = te->id;
                // 执行时钟回调函数
                retval = te->timeProc(eventLoop, id, te->clientData);
                processed++;
                // 如果回调函数不返回 AE_NOMORE,重新更新该事件的到期时间,等待下次触发,否则标识事件为删除状态。
                if (retval != AE_NOMORE) {
                    aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
                } else {
                    te->id = AE_DELETED_EVENT_ID;
                }
            }
            te = te->next;
        }
        return processed;
    }
  • 时间事件定时执行原理。

    到期事件回调处理函数 timeProc,例如 redis 对应的处理函数 serverCronserverCron 返回下一次到期的时间间隔,事件到期时间被(aeAddMillisecondsToNow)修改延后一个时间间隔,下一次到期再重新执行,从而达到时钟定期执行的效果。

 
    int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
        ...
        // 返回时间间隔,单位毫秒。
        return 1000/server.hz;
    }
  • 定时器定时执行频率。

    很多后台任务都在定时器里执行,定时执行频率可以由配置文件的 hz 频率控制,时间事件 (1000/hz) 毫秒执行一次,频率越高,到期时间间隔越小,刷得越快,定时后台任务处理得越快,但是这样也会相应地损耗更多的系统资源,而且定时事件和文件事件是在同一个进程中进行的,这样肯定会影响到文件事件执行。一般情况下,系统默认一秒定时执行 10 次,也就是 hz == 10

 
    # redis.conf
    
    # 定时器事件刷新频率 1 < hz < 500,默认 10
    hz 10
  • 多定时任务

    事件里有不同的定时任务,它们定时执行的任务有快有慢,那对于多个定时任务,在时间事件触发后,它是如何处理的呢? 可以通过宏 run_with_period 处理。当时间间隔 _ms_ 很小的时候,每次触发时间事件,任务都会执行,否则通过记录事件触发的次数 server.cronloops++,当 hz 触发的事件时间间隔累积起来达到长时间间隔,就执行慢任务。(参考上图)

 
    struct redisServer {
        ...
        int cronloops;              /* Number of times the cron function run */
        ...
    }
    
    #define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))
    
    int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
        ...
        // 快任务,时间间隔 <= 时间事件触发时间间隔,执行。
        run_with_period(100) {
            ...
        }
        ...
        // 慢任务,定时时间间隔比较长的,需要通过 server.cronloops 累加达到长时间间隔,才会执行慢任务。
        run_with_period(5000) {
            ...
        }
    
        // 每次触发定时事件,都会执行。
        /* We need to do a few operations on clients asynchronously. */
        clientsCron();
    
        /* Handle background operations on Redis databases. */
        databasesCron();
        ...
        server.cronloops++;
        ...
    }

5. 效率

  • 添加事件,新的事件是直接添加到链表头,数据并没有排序。
  • 搜索最快过期的时钟事件,遍历查找,时间复杂度 O(N)。

基于上面两个问题,所以时钟过期处理效率是比较低的,但是 redis 里的基本没啥时钟事件,感觉暂时没有优化的必要。

 
    /* Search the first timer to fire.
     * This operation is useful to know how many time the select can be
     * put in sleep without to delay any event.
     * If there are no timers NULL is returned.
     *
     * Note that's O(N) since time events are unsorted.
     * Possible optimizations (not needed by Redis so far, but...):
     * 1) Insert the event in order, so that the nearest is just the head.
     *    Much better but still insertion or deletion of timers is O(N).
     * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
     */
    static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop) {
        aeTimeEvent *te = eventLoop->timeEventHead;
        aeTimeEvent *nearest = NULL;
    
        while (te) {
            if (!nearest || te->when_sec < nearest->when_sec ||
                (te->when_sec == nearest->when_sec &&
                 te->when_ms < nearest->when_ms))
                nearest = te;
            te = te->next;
        }
        return nearest;
    }

6. 小结

  • 定时器是由时间事件组成的,目前,redis 核心的定时事件只有一个,核心逻辑都在 serverCron 函数里。
  • 定时器定时触发频率受配置 hz 影响,可以通过修改该配置项,调整定时处理速度。
  • 多定时任务,其实在同一个定时器时间事件里处理。
  • 定时事件和文件事件都在同一个进程中执行,所以虽然定时事件时间精度是毫秒,但是不一定会十分精确,会受到文件事件处理影响。
  • 定时事件执行还会受到定时器里的任务处理影响,例如系统在一个时间段内很多过期数据,那么系统有可能会分配更多的时间片去处理。
  • 基于 redis 主逻辑都在单进程主线程中实现,所以定时任务不能执行太长的时间,所以很多复杂的定时任务,都会限制处理数量和处理时间。(例如 字典 dict 的扩容和缩容maxmemory 数据淘汰策略 等等。)

7. 后记

通读一个知识点后,知识在脑海中是模糊的,需要通过不同方式去强化清晰这个脑海中的映像。让抽象思维落地,我自己会经常将一些知识点图形化,这样一点一点地将知识碎片拼接起来。

202404052230121123.png

阅读全文