定时器是 redis 异步处理事件的一个十分重要的功能。
redis 定时器功能由多个时间事件组成,事件由一个双向链表维护。
时间事件可以处理多个定时任务。
理解 redis 定时器,我们带着问题,看看 redis 是怎么处理的:
- 定时器作用是什么。
- 定时器实现原理。
- 单进程里如何同时处理文件事件和时间事件。
- 如何实现多定时任务。
1. 作用
redis 定时器核心逻辑在 serverCron
函数里。
- 对设置了过期时间的数据进行检查回收。
- 异步回收需要关闭的链接(socket)。
- 检查 fork 的子进程是否已经关闭,处理回收的相关工作。
- 检查内存数据是否符合 rdb 持久化快照落地条件,fork 子进程进行快照保存。
bgsave
rdb 生成快照或bgrewriteaof
aof 重写延后操作。- 对需要扩容和缩容的哈希表(dict)进行数据迁移。
- 集群里节点间的断线重连。
- redis 服务的一些信息统计。
- …
2. 定时器实现原理
redis 定时器功能由多个时间事件组成,事件由一个双向链表维护。
2.1. 事件结构
- 事件。
// 时间事件
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc; /* 时钟到期事件触发回调处理函数。*/
aeEventFinalizerProc *finalizerProc; /* 时间事件删除时,触发回调。*/
void *clientData; /* 扩展参数,异步操作方便数据回调,在 timeProc 通过参数回传。*/
struct aeTimeEvent *prev; /* 前一个节点。*/
struct aeTimeEvent *next; /* 后一个节点。*/
} aeTimeEvent;
// 事件管理
typedef struct aeEventLoop {
...
long long timeEventNextId; // 时间事件下一个 id (通过 ‘++’ 递增)
...
aeTimeEvent *timeEventHead; // 时间事件链表。
...
} aeEventLoop;
- 事件循环。
// 循环处理事件。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
- 事件回调函数。
// 时间事件触发处理函数。
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
// 时间事件处理完毕,被删除时,触发的回调处理。
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
- 创建时间事件,时间事件通过双向链表管理,新的事件插入到链表头。
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc) {
// 事件 id 递增。
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
// 设置到期时间。
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
te->prev = NULL;
te->next = eventLoop->timeEventHead;
if (te->next)
te->next->prev = te;
eventLoop->timeEventHead = te;
return id;
}
- 设置事件到期时间。
static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
long cur_sec, cur_ms, when_sec, when_ms;
// 当前时间增加到期时间间隔。
aeGetTime(&cur_sec, &cur_ms);
when_sec = cur_sec + milliseconds/1000;
when_ms = cur_ms + milliseconds%1000;
if (when_ms >= 1000) {
when_sec ++;
when_ms -= 1000;
}
*sec = when_sec;
*ms = when_ms;
}
2.2. 定时器执行流程
- redis 启动添加时钟处理事件。
// ae.c
int main(int argc, char **argv) {
...
initServer();
...
aeMain(server.el);
...
}
void initServer(void) {
...
// 创建定时事件,绑定回调函数。
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
...
}
// 时钟回调处理函数
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...
}
- 事件循环处理。
// 循环处理事件。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
// 处理时间事件
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
...
/* Check time events */
if (flags & AE_TIME_EVENTS)
// 处理时间事件
processed += processTimeEvents(eventLoop);
...
}
3. 单进程异步处理事件逻辑
进程通过循环,不停地处理时间事件和文件事件。
- 在进程的循环中不停地捞出文件和时间事件进行处理
aeProcessEvents
。
// ae.c
int main(int argc, char **argv) {
...
aeMain(server.el);
...
}
// 循环处理事件。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
...
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
// 处理事件
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
...
// 先搜索出最快到期的定时器,查看时间戳,文件事件要在定时器到期前从系统内核捞出来处理。
shortest = aeSearchNearestTimer(eventLoop);
...
// 处理文件事件,等待获取事件时间间隔不能太长,否则定时器事件处理要超时了。
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
...
}
...
// 处理时间事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
...
}
- 对于文件事件,在 linux 系统中,redis 采用了
epoll
多路复用 I/O 事件驱动处理文件事件。通过epoll_wait
捞出就绪事件进行处理。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
...
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
...
}
- 先处理完文件事件,再处理时间事件。
static int processTimeEvents(aeEventLoop *eventLoop) {
...
}
4. 多定时任务
我们看看 processTimeEvents
是如何处理多个定时任务的:
- 遍历时间事件链表,先删除已处理,被标识需要删除的事件,再执行到期事件。
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
time_t now = time(NULL);
...
eventLoop->lastTime = now;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
// 遍历时间链表,处理到期执行的时间事件。
while(te) {
long now_sec, now_ms;
long long id;
// 从时间事件链表中,删除时被标识为删除状态的事件。
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
if (te->prev)
te->prev->next = te->next;
else
eventLoop->timeEventHead = te->next;
if (te->next)
te->next->prev = te->prev;
if (te->finalizerProc)
te->finalizerProc(eventLoop, te->clientData);
zfree(te);
te = next;
continue;
}
...
// 时间事件到期,执行事件。
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms)) {
int retval;
id = te->id;
// 执行时钟回调函数
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
// 如果回调函数不返回 AE_NOMORE,重新更新该事件的到期时间,等待下次触发,否则标识事件为删除状态。
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
te->id = AE_DELETED_EVENT_ID;
}
}
te = te->next;
}
return processed;
}
-
时间事件定时执行原理。
到期事件回调处理函数
timeProc
,例如 redis 对应的处理函数serverCron
。serverCron
返回下一次到期的时间间隔,事件到期时间被(aeAddMillisecondsToNow
)修改延后一个时间间隔,下一次到期再重新执行,从而达到时钟定期执行的效果。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...
// 返回时间间隔,单位毫秒。
return 1000/server.hz;
}
-
定时器定时执行频率。
很多后台任务都在定时器里执行,定时执行频率可以由配置文件的
hz
频率控制,时间事件 (1000/hz) 毫秒执行一次,频率越高,到期时间间隔越小,刷得越快,定时后台任务处理得越快,但是这样也会相应地损耗更多的系统资源,而且定时事件和文件事件是在同一个进程中进行的,这样肯定会影响到文件事件执行。一般情况下,系统默认一秒定时执行 10 次,也就是hz == 10
。
# redis.conf
# 定时器事件刷新频率 1 < hz < 500,默认 10
hz 10
-
多定时任务
事件里有不同的定时任务,它们定时执行的任务有快有慢,那对于多个定时任务,在时间事件触发后,它是如何处理的呢? 可以通过宏
run_with_period
处理。当时间间隔_ms_
很小的时候,每次触发时间事件,任务都会执行,否则通过记录事件触发的次数server.cronloops++
,当hz
触发的事件时间间隔累积起来达到长时间间隔,就执行慢任务。(参考上图)
struct redisServer {
...
int cronloops; /* Number of times the cron function run */
...
}
#define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...
// 快任务,时间间隔 <= 时间事件触发时间间隔,执行。
run_with_period(100) {
...
}
...
// 慢任务,定时时间间隔比较长的,需要通过 server.cronloops 累加达到长时间间隔,才会执行慢任务。
run_with_period(5000) {
...
}
// 每次触发定时事件,都会执行。
/* We need to do a few operations on clients asynchronously. */
clientsCron();
/* Handle background operations on Redis databases. */
databasesCron();
...
server.cronloops++;
...
}
5. 效率
- 添加事件,新的事件是直接添加到链表头,数据并没有排序。
- 搜索最快过期的时钟事件,遍历查找,时间复杂度 O(N)。
基于上面两个问题,所以时钟过期处理效率是比较低的,但是 redis 里的基本没啥时钟事件,感觉暂时没有优化的必要。
/* Search the first timer to fire.
* This operation is useful to know how many time the select can be
* put in sleep without to delay any event.
* If there are no timers NULL is returned.
*
* Note that's O(N) since time events are unsorted.
* Possible optimizations (not needed by Redis so far, but...):
* 1) Insert the event in order, so that the nearest is just the head.
* Much better but still insertion or deletion of timers is O(N).
* 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
*/
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop) {
aeTimeEvent *te = eventLoop->timeEventHead;
aeTimeEvent *nearest = NULL;
while (te) {
if (!nearest || te->when_sec < nearest->when_sec ||
(te->when_sec == nearest->when_sec &&
te->when_ms < nearest->when_ms))
nearest = te;
te = te->next;
}
return nearest;
}
6. 小结
- 定时器是由时间事件组成的,目前,redis 核心的定时事件只有一个,核心逻辑都在
serverCron
函数里。 - 定时器定时触发频率受配置
hz
影响,可以通过修改该配置项,调整定时处理速度。 - 多定时任务,其实在同一个定时器时间事件里处理。
- 定时事件和文件事件都在同一个进程中执行,所以虽然定时事件时间精度是毫秒,但是不一定会十分精确,会受到文件事件处理影响。
- 定时事件执行还会受到定时器里的任务处理影响,例如系统在一个时间段内很多过期数据,那么系统有可能会分配更多的时间片去处理。
- 基于 redis 主逻辑都在单进程主线程中实现,所以定时任务不能执行太长的时间,所以很多复杂的定时任务,都会限制处理数量和处理时间。(例如 字典 dict 的扩容和缩容,maxmemory 数据淘汰策略 等等。)
7. 后记
通读一个知识点后,知识在脑海中是模糊的,需要通过不同方式去强化清晰这个脑海中的映像。让抽象思维落地,我自己会经常将一些知识点图形化,这样一点一点地将知识碎片拼接起来。