Redis 单机服务器实现
1. Redis 服务器
Redis服务器负责与客户端建立网络连接,处理发送的命令请求,在数据库中保存客户端执行命令所产生的数据,并且通过一系列资源管理措施来维持服务器自身的正常运转。本次主要剖析server.c
文件,本文主要介绍Redis服务器的一下几个实现:
- 命令的执行过程
- Redis服务器的周期性任务
- maxmemory的策略
- Redis服务器的main函数
其他的注释请上github查看:Redis 单机服务器实现源码注释
2. 命令的执行过程
Redis一个命令的完整执行过程如下:
- 客户端发送命令请求
- 服务器接收命令请求
- 服务器执行命令请求
- 将回复发送给客户端
关于命令接收与命令回复,在Redis 网络连接库剖析一文已经详细剖析过,本篇主要针对 第三步 ,也就是 服务器执行命令的过程 进行剖析。
服务器在接收到命令后,会将命令以对象的形式保存在服务器client
的参数列表robj **argv
中,因此服务器执行命令请求时,服务器已经读入了一套命令参数保存在参数列表中。执行命令的过程对应的函数是processCommand()
,源码如下:
// 如果client没有被关闭则返回C_OK,调用者可以继续执行其他的操作,否则返回C_ERR,表示client被销毁
int processCommand(client *c) {
// 如果是 quit 命令,则单独处理
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
addReply(c,shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY; //设置client的状态为回复后立即关闭,返回C_ERR
return C_ERR;
}
// 从数据库的字典中查找该命令
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
// 不存在的命令
if (!c->cmd) {
flagTransaction(c); //如果是事务状态的命令,则设置事务为失败
addReplyErrorFormat(c,"unknown command '%s'",
(char*)c->argv[0]->ptr);
return C_OK;
// 参数数量不匹配
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
flagTransaction(c); //如果是事务状态的命令,则设置事务为失败
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return C_OK;
}
/* Check if the user is authenticated */
// 如果服务器设置了密码,但是没有认证成功
if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
{
flagTransaction(c); //如果是事务状态的命令,则设置事务为失败
addReply(c,shared.noautherr);
return C_OK;
}
// 如果开启了集群模式,则执行集群的重定向操作,下面的两种情况例外:
/*
1. 命令的发送是主节点服务器
2. 命令没有key
*/
if (server.cluster_enabled &&
!(c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_LUA &&
server.lua_caller->flags & CLIENT_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
c->cmd->proc != execCommand))
{
int hashslot;
int error_code;
// 从集群中返回一个能够执行命令的节点
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
&hashslot,&error_code);
// 返回的节点不合格
if (n == NULL || n != server.cluster->myself) {
// 如果是执行事务的命令,则取消事务
if (c->cmd->proc == execCommand) {
discardTransaction(c);
} else {
// 将事务状态设置为失败
flagTransaction(c);
}
// 执行client的重定向操作
clusterRedirectClient(c,n,hashslot,error_code);
return C_OK;
}
}
// 如果服务器有最大内存的限制
if (server.maxmemory) {
// 按需释放一部分内存
int retval = freeMemoryIfNeeded();
// freeMemoryIfNeeded()函数之后需要冲洗从节点的输出缓冲区,这可能导致被释放的从节点是一个活跃的client
// 如果当前的client被释放,返回C_ERR
if (server.current_client == NULL) return C_ERR;
// 如果命令会耗费大量的内存但是释放内存失败
if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) {
// 将事务状态设置为失败
flagTransaction(c);
addReply(c, shared.oomerr);
return C_OK;
}
}
// 如果 BGSAVE 命令执行错误而且服务器是一个主节点,那么不接受写命令
if (((server.stop_writes_on_bgsave_err &&
server.saveparamslen > 0 &&
server.lastbgsave_status == C_ERR) ||
server.aof_last_write_status == C_ERR) &&
server.masterhost == NULL &&
(c->cmd->flags & CMD_WRITE ||
c->cmd->proc == pingCommand))
{
// 将事务状态设置为失败
flagTransaction(c);
// 如果上一次执行AOF成功回复BGSAVE错误回复
if (server.aof_last_write_status == C_OK)
addReply(c, shared.bgsaveerr);
else
addReplySds(c,
sdscatprintf(sdsempty(),
"-MISCONF Errors writing to the AOF file: %s\r\n",
strerror(server.aof_last_write_errno)));
return C_OK;
}
// 如果没有足够的良好的从节点而且用户配置了 min-slaves-to-write,那么不接受写命令
if (server.masterhost == NULL &&
server.repl_min_slaves_to_write &&
server.repl_min_slaves_max_lag &&
c->cmd->flags & CMD_WRITE &&
server.repl_good_slaves_count < server.repl_min_slaves_to_write)
{
// 将事务状态设置为失败
flagTransaction(c);
addReply(c, shared.noreplicaserr);
return C_OK;
}
// 如果这是一个只读的从节点服务器,则不接受写命令
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
c->cmd->flags & CMD_WRITE)
{
addReply(c, shared.roslaveerr);
return C_OK;
}
// 如果处于发布订阅模式,但是执行的不是发布订阅命令,返回
if (c->flags & CLIENT_PUBSUB &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
return C_OK;
}
// 如果是从节点且和主节点断开了连接,不允许从服务器带有过期数据,返回
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
server.repl_serve_stale_data == 0 &&
!(c->cmd->flags & CMD_STALE))
{
flagTransaction(c);
addReply(c, shared.masterdownerr);
return C_OK;
}
// 如果服务器处于载入状态,如果命令不是CMD_LOADING标识,则不执行,返回
if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
addReply(c, shared.loadingerr);
return C_OK;
}
// 如果lua脚本超时,限制执行一部分命令,如shutdown、scriptCommand
if (server.lua_timedout &&
c->cmd->proc != authCommand &&
c->cmd->proc != replconfCommand &&
!(c->cmd->proc == shutdownCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
!(c->cmd->proc == scriptCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
{
flagTransaction(c);
addReply(c, shared.slowscripterr);
return C_OK;
}
// 执行命令
// client处于事务环境中,但是执行命令不是exec、discard、multi和watch
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
// 除了上述的四个命令,其他的命令添加到事务队列中
queueMultiCommand(c);
addReply(c,shared.queued);
// 执行普通的命令
} else {
call(c,CMD_CALL_FULL);
// 保存写全局的复制偏移量
c->woff = server.master_repl_offset;
// 如果因为BLPOP而阻塞的命令已经准备好,则处理client的阻塞状态
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
return C_OK;
}
我们总结出执行命令的大致过程:
- 查找命令。对应的代码是:
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
- 执行命令前的准备。对应这些判断语句。
- 执行命令。对应代码是:
call(c,CMD_CALL_FULL);
我们就大致就这三个过程详细解释。
2.1 查找命令
lookupCommand()
函数是对dictFetchValue(server.commands, name);
的封装。而这个函数的意思是:从server.commands
字典中查找name
命令。这个保存命令表的字典,键是命令的名称,值是命令表的地址。因此我们介绍服务器初始化时的一个操作,就是创建一张命令表。命令表代码简化表示如下:
struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
......
};
我们只展示了命令表的两条,可以通过COMMAND COUNT
命令查看命令的个数。虽然只有两条,但是可以说明问题。
首先命令表是就是一个数组,数组的每个成员都是一个struct redisCommand
结构体,对每个数组成员都进行了初始化。我们一次对每个值进行分析:以GET
命令为例子。
-
char *name :命令的名字。对应
"get"
。 -
redisCommandProc *proc :命令实现的函数。对应
getCommand
。 -
int arity :参数个数,-N表示大于等于N。对应
2
。 -
char *sflags :命令的属性,用以下字符作为标识。对应
"rF"
。- w:写入命令,会修改数据库。
- r:读取命令,不会修改数据库。
- m:一旦执行会增加内存使用,如果内存短缺则不被允许执行。
- a:管理员命令,例如:SAVE or SHUTDOWN。
- p:发布订阅有关的命令。
- f:强制进行复制的命令,无视服务器的脏键。
- s:不能在脚本中执行的命令。
- R:随机命令。相同的键有相同的参数,在相同的数据库中,可能会有不同的结果。
- S:如果在脚本中调用,那么会对这个命令的输出进行一次排序。
- l:当载入数据库时,允许执行该命令。
- t:从节点服务器持有过期数据时,允许执行的命令。
- M:不能在 MONITOR 下自动传播的命令。
- k:为该命令执行一个隐式的 ASKING,所以在集群模式下,如果槽被标记为’importing’,那这个命令会被接收。
- F:快速执行的命令。时间复杂度为O(1) or O(log(N))的命令只要内核调度为Redis分配时间片,那么就不应该在执行时被延迟。
-
int flags :sflags的二进制标识形式,可以通过位运算进行组合。对应
0
。 -
redisGetKeysProc *getkeys_proc :从命令中获取键的参数,是一个可选的功能,一般用于三个字段不够执行键的参数的情况。对应
NULL
。 -
int firstkey :第一个参数是 key。对应
1
。 -
int lastkey :最后一个参数是 key。对应
1
。 -
int keystep :从第一个 key 到最后一个 key 的步长。MSET 的步长是 2 因为:key,val,key,val,…。对应
1
。 -
long long microseconds :记录执行命令的耗费总时长。对应
0
。 -
long long calls :记录命令被执行的总次数。对应
0
。
当从命令表中找到命令后,会将找到的命令的地址,返回给struct redisCommand *cmd, *lastcmd;
这两个指针保存起来。到此查找命令的操作就完成。
2.2 执行命令前的准备
此时,命令已经在命令表中查找到,并且保存在了对应的指针中。但是真正执行前,还进行了许多的情况的判断。我们简单列举几种。
- 首先就是判断命令的参数是否匹配。
- 检查服务器的认证是否通过。
- 集群模式下的判断。
- 服务器最大内存限制是否通过。
- 某些情况下,不接受写命令。
- 发布订阅模式。
- 是否是lua脚本中的命令。
- 等等……
所以,命令执行的过程还是很复杂的,简单总结一句: 命令不易,何况人生。
2.3 执行命令
执行命令调用了call(c,CMD_CALL_FULL)
函数,该函数是执行命令的核心。但是不用想,这个函数一定是对回调函数c->cmd->proc(c)
的封装,因为proc
指向命令的实现函数。我们贴出该函数的代码:
void call(client *c, int flags) {
long long dirty, start, duration;
int client_old_flags = c->flags; //备份client的flags
// 将命令发送给 MONITOR
if (listLength(server.monitors) &&
!server.loading &&
!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
{
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
// 清除一些需要按照命令需求设置的标志,以防干扰
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
// 初始化Redis操作数组,用来追加命令的传播
redisOpArrayInit(&server.also_propagate);
/* Call the command. */
// 备份脏键数
dirty = server.dirty;
// 获取执行命令的开始时间
start = ustime();
// 执行命令
c->cmd->proc(c);
// 命令的执行时间
duration = ustime()-start;
// 命令修改的键的个数
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;
// 当执行 EVAL 命令时正在加载AOF,而且不希望Lua调用的命令进入slowlog或填充统计信息
if (server.loading && c->flags & CLIENT_LUA)
flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS); //取消慢查询和记录统计信息的标志
// 如果函数调用者是Lua脚本,且命令的flags或客户端的flags指定了强制传播,我们要强制EVAL调用者传播脚本
if (c->flags & CLIENT_LUA && server.lua_caller) {
// 如果指定了强制将命令传播到从节点
if (c->flags & CLIENT_FORCE_REPL)
server.lua_caller->flags |= CLIENT_FORCE_REPL; //强制执行lua脚本的client要传播命令到从节点
// 如果指定了强制将节点传播到AOF中
if (c->flags & CLIENT_FORCE_AOF)
server.lua_caller->flags |= CLIENT_FORCE_AOF; //强制执行lua脚本的client要传播命令到AOF文件
}
// 命令的flags指定了慢查询标志,要将总的统计信息推入慢查询日志中
if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
char *latency_event = (c->cmd->flags & CMD_FAST) ?
"fast-command" : "command";
// 记录将延迟事件和延迟时间关联到延迟诊断的字典中
latencyAddSampleIfNeeded(latency_event,duration/1000);
// 将总的统计信息推入慢查询日志中
slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
}
// 命令的flags指定了CMD_CALL_STATS,更新命令的统计信息
if (flags & CMD_CALL_STATS) {
c->lastcmd->microseconds += duration;
c->lastcmd->calls++;
}
// 如果client设置了强制传播的标志或修改了数据集,则将命令发送给从节点服务器或追加到AOF中
if (flags & CMD_CALL_PROPAGATE &&
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
// 保存传播的标志,初始化为空
int propagate_flags = PROPAGATE_NONE;
// 如果命令修改了数据库中的键,则要传播到AOF和从节点中
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
// 如果client设置了强制AOF和复制的标志,则设置传播的标志
if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
// 如果client的flags设置了CLIENT_PREVENT_REPL/AOF_PROP,表示阻止命令的传播到从节点或AOF,则取消传播对应标志
if (c->flags & CLIENT_PREVENT_REPL_PROP ||
!(flags & CMD_CALL_PROPAGATE_REPL))
propagate_flags &= ~PROPAGATE_REPL;
if (c->flags & CLIENT_PREVENT_AOF_PROP ||
!(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF;
// 如果至少设置了一种传播,则执行相应传播命令操作
if (propagate_flags != PROPAGATE_NONE)
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}
// 清除一些需要按照命令需求设置的标志,以防干扰
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
// 恢复client原始的flags
c->flags |= client_old_flags &
(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
// 传播追加在Redis操作数组中的命令
if (server.also_propagate.numops) {
int j;
redisOp *rop;
// 如果命令的flags设置传播的标志
if (flags & CMD_CALL_PROPAGATE) {
// 遍历所有的命令
for (j = 0; j < server.also_propagate.numops; j++) {
rop = &server.also_propagate.ops[j];
int target = rop->target;
/* Whatever the command wish is, we honor the call() flags. */
// 执行相应传播命令操作
if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
if (target)
propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
}
}
// 释放Redis操作数组
redisOpArrayFree(&server.also_propagate);
}
// 命令执行的次数加1
server.stat_numcommands++;
}
执行命令时,可以指定一个flags
。这个flags
是用于 执行完命令之后的一些后续工作 。我们说明这些flags
的含义:
CMD_CALL_NONE:没有指定flags
CMD_CALL_SLOWLOG:检查命令的执行速度,如果需要记录在慢查询日志中
CMD_CALL_STATS:记录命令的统计信息
CMD_CALL_PROPAGATE_AOF:如果client设置了强制传播的标志或修改了数据集,则将命令追加到AOF文件中
CMD_CALL_PROPAGATE_REPL:如果client设置了强制传播的标志或修改了数据集,则将命令发送给从节点服务器中
CMD_CALL_PROPAGATE:如果client设置了强制传播的标志或修改了数据集,则将命令发送给从节点服务器或追加到AOF中
CMD_CALL_FULL:包含以上所有的含义
执行命令c->cmd->proc(c)
就相当于执行了命令实现的函数,然后会在执行完成后,由这些函数产生相应的命令回复,根据回复的大小,会将回复保存在 输出缓冲区 buf
或 回复链表repl
中。然后服务器会调用writeToClient()
函数来将回复写到fd
中。详细请看:Redis 网络连接库剖析。
至此,一条命令的执行过程就很清楚明了了。
3. Redis服务器的周期性任务
我们曾经在Redis 事件处理实现一文中说到,Redis的事件分为文件事件(file event)
和时间事件(time event)
。时间事件虽然是晚于文件事件
执行,但是会每隔100ms
都会执行一次。话不多说直接上代码:Redis 单机服务器实现源码注释
// 使用一个宏定义:run_with_period(milliseconds) { .... },实现一部分代码有次数限制的被执行
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j;
UNUSED(eventLoop);
UNUSED(id);
UNUSED(clientData);
// 如果设置了看门狗,则在过期时间内,递达一个 SIGALRM 信号
if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
// 设置服务器的时间缓存
updateCachedTime();
// 更新服务器的一些统计值
run_with_period(100) {
// 命令执行的次数
trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
// 从网络读到的字节数
trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
server.stat_net_input_bytes);
// 已经写到网络的字节数
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
server.stat_net_output_bytes);
}
// 服务器的LRU时间表示位数为24位,因此最长表示2^24秒,大约1.5年,只要在1.5年内,该对象被访问,那么就不会出现对象的LRU时间比服务器的时钟还要年轻的现象
// LRU_CLOCK_RESOLUTION 可以改变LRU时间的精度
// 获取服务器的LRU时钟
server.lruclock = getLRUClock();
// 更新服务器的最大内存使用量峰值
if (zmalloc_used_memory() > server.stat_peak_memory)
server.stat_peak_memory = zmalloc_used_memory();
// 更新常驻内存的大小
server.resident_set_size = zmalloc_get_rss();
// 安全的关闭服务器
if (server.shutdown_asap) {
// 关闭服务器前的准备动作,成功则关闭服务器
if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
// 失败则打印日志
serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
// 撤销关闭服务器标志
server.shutdown_asap = 0;
}
// 打印数据库的信息到日志中
run_with_period(5000) {
// 遍历数据库
for (j = 0; j < server.dbnum; j++) {
long long size, used, vkeys;
// 获取当前数据库的键值对字典的槽位数,键值对字典已使用的数量,过期键字典已使用的数量
size = dictSlots(server.db[j].dict);
used = dictSize(server.db[j].dict);
vkeys = dictSize(server.db[j].expires);
// 打印到日志中
if (used || vkeys) {
serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
/* dictPrintStats(server.dict); */
}
}
}
// 如果服务器不在哨兵模式下,那么周期性打印一些连接client的信息到日志中
if (!server.sentinel_mode) {
run_with_period(5000) {
serverLog(LL_VERBOSE,
"%lu clients connected (%lu slaves), %zu bytes in use",
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
zmalloc_used_memory());
}
}
// 执行client的周期性任务
clientsCron();
// 执行数据库的周期性任务
databasesCron();
// 如果当前没有正在进行RDB和AOF持久化操作,且AOF重写操作被提上了日程,那么在后台执行AOF的重写操作
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.aof_rewrite_scheduled)
{
rewriteAppendOnlyFileBackground();
}
// 如果正在进行RDB或AOF重写等操作,那么等待接收子进程发来的信息
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
ldbPendingChildren())
{
int statloc;
pid_t pid;
// 接收所有子进程发送的信号,非阻塞
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
// 获取退出码
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;
// 判断子进程是否因为信号而终止,是的话,取得子进程因信号而中止的信号码
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
// 子进程没有退出,还在进行RDB或AOF重写等操作
if (pid == -1) {
// 打印日志
serverLog(LL_WARNING,"wait3() returned an error: %s. "
"rdb_child_pid = %d, aof_child_pid = %d",
strerror(errno),
(int) server.rdb_child_pid,
(int) server.aof_child_pid);
// RDB持久化完成
} else if (pid == server.rdb_child_pid) {
// 将RDB文件写入磁盘或网络中
backgroundSaveDoneHandler(exitcode,bysignal);
// AOF持久化完成
} else if (pid == server.aof_child_pid) {
// 将重写缓冲区的命令追加AOF文件中,且进行同步操作
backgroundRewriteDoneHandler(exitcode,bysignal);
// 其他子进程,打印日志
} else {
if (!ldbRemoveChild(pid)) {
serverLog(LL_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
}
// 更新能否resize哈希的策略
updateDictResizePolicy();
}
// 没有正在进行RDB或AOF重写等操作,那么检查是否需要执行
} else {
// 遍历save命令的参数数组
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
// 数据库的键被修改的次数大于SAVE命令参数指定的修改次数,且已经过了SAVE命令参数指定的秒数
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
// 进行 BGSAVE 操作
rdbSaveBackground(server.rdb_filename);
break;
}
}
// 是否触发AOF重写操作
if (server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size)
{
// 上一次重写后的大小
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
// AOF文件增长的百分比
long long growth = (server.aof_current_size*100/base) - 100;
// 大于设置的百分比100则进行AOF后台重写
if (growth >= server.aof_rewrite_perc) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
}
}
// 将AOF缓存冲洗到磁盘中
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
// 当AOF重写操作,同样将重写缓冲区的数据刷新到AOF文件中
run_with_period(1000) {
if (server.aof_last_write_status == C_ERR)
flushAppendOnlyFile(0);
}
// 释放被设置为异步释放的client
freeClientsInAsyncFreeQueue();
// 解除client的暂停状态
clientsArePaused(); /* Don't check return value, just use the side effect. */
// 周期性执行复制的任务
run_with_period(1000) replicationCron();
/* Run the Redis Cluster cron. */
// 周期性执行集群任务
run_with_period(100) {
if (server.cluster_enabled) clusterCron();
}
//周期性执行哨兵任务
run_with_period(100) {
if (server.sentinel_mode) sentinelTimer();
}
// 清理过期的被缓存的sockets连接
run_with_period(1000) {
migrateCloseTimedoutSockets();
}
// 如果 BGSAVE 被提上过日程,那么进行BGSAVE操作,因为AOF重写操作在更新
// 注意:此代码必须在上面的replicationCron()调用之后,确保在重构此文件以保持此顺序时。 这是有用的,因为我们希望优先考虑RDB节省的复制
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.rdb_bgsave_scheduled &&
(server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
// 更新执行BGSAVE,成功则清除rdb_bgsave_scheduled标志
if (rdbSaveBackground(server.rdb_filename) == C_OK)
server.rdb_bgsave_scheduled = 0;
}
// 周期loop计数器加1
server.cronloops++;
// 返回周期,默认为100ms
return 1000/server.hz;
}
我们也是大致总结列出部分:
- 主动删除过期的键(也可以在读数据库时被动删除)
- 喂看门狗 watchdog
- 更新一些统计值
- 渐进式rehash
- 触发 BGSAVE / AOF 的重写操作,并处理子进程的中断
- 不同状态的client的超时
- 复制重连
- 等……
我们重点看两个函数,一个是关于 客户端资源管理 的clientsCron()
,一个是关于 数据库资源管理 的databasesCron()
。
3.1客户端资源管理
服务器要定时检查client是否与服务器有交互,如果超过了设置的限制时间,则要释放client所占用的资源。具体的函数是clientsCronHandleTimeout()
,它被clientsCron()
函数所调用。
// 检查超时,如果client中断超时返回非零值,函数获取当前时间作为参数因为他被一个循环中调用多次。所以调用gettimeofday()为每一次迭代都是昂贵的,而没有任何实际的效益
// client被关闭则返回1,没有关闭返回0
int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
// 当前时间,单位秒
time_t now = now_ms/1000;
// 当前时间 - client上一次和服务器交互的时间 如果大于 服务器中设置client超过的最大时间
// 不检查这四类client的超时时间:slaves从节点服务器、masters主节点服务器、BLPOP被阻塞的client、订阅状态的client
if (server.maxidletime &&
!(c->flags & CLIENT_SLAVE) && /* no timeout for slaves */
!(c->flags & CLIENT_MASTER) && /* no timeout for masters */
!(c->flags & CLIENT_BLOCKED) && /* no timeout for BLPOP */
!(c->flags & CLIENT_PUBSUB) && /* no timeout for Pub/Sub clients */
(now - c->lastinteraction > server.maxidletime))
{
serverLog(LL_VERBOSE,"Closing idle client");
freeClient(c);
return 1;
// 如果client处于BLPOP被阻塞
} else if (c->flags & CLIENT_BLOCKED) {
// 如果阻塞的client的超时时间已经到达
if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) {
// 回复client一个空回复
replyToBlockedClientTimedOut(c);
// 接触client的阻塞状态
unblockClient(c);
// 如果服务器处于集群模式
} else if (server.cluster_enabled) {
// 重定向client的阻塞到其他的服务器
if (clusterRedirectBlockedClientIfNeeded(c))
// 解除阻塞
unblockClient(c);
}
}
return 0;
}
3.2 数据库资源管理
服务器要定时检查数据库的输入缓冲区是否可以resize
,以节省内存资源。而resize输入缓冲区的两个条件:
- 输入缓冲区的大小大于32K以及超过缓冲区的峰值的2倍。
- client超过时间大于2秒,且输入缓冲区的大小超过1k
实现的函数是clientsCronResizeQueryBuffer()
,被databasesCron()
函数所调用。
// resize客户端的输入缓冲区
int clientsCronResizeQueryBuffer(client *c) {
// 获取输入缓冲区的大小
size_t querybuf_size = sdsAllocSize(c->querybuf);
// 计算服务器对于client的空转时间,也就是client的超时时间
time_t idletime = server.unixtime - c->lastinteraction;
// resize输入缓冲区的两个条件:
// 1. 输入缓冲区的大小大于32K以及超过缓冲区的峰值的2倍
// 2. client超过时间大于2秒,且输入缓冲区的大小超过1k
if (((querybuf_size > PROTO_MBULK_BIG_ARG) &&
(querybuf_size/(c->querybuf_peak+1)) > 2) ||
(querybuf_size > 1024 && idletime > 2))
{
// 只有输入缓冲区的未使用大小超过1k,则会释放未使用的空间
if (sdsavail(c->querybuf) > 1024) {
c->querybuf = sdsRemoveFreeSpace(c->querybuf);
}
}
// 清空输入缓冲区的峰值
c->querybuf_peak = 0;
return 0;
}
4. maxmemory的策略
Redis 服务器对内存使用会有一个server.maxmemory
的限制,如果超过这个限制,就要通过删除一些键空间来释放一些内存,具体函数对应freeMemoryIfNeeded()
。Redis 单机服务器实现源码注释
释放内存时,可以指定不同的策略。策略保存在maxmemory_policy
中,他可以指定以下的几个值:
#define MAXMEMORY_VOLATILE_LRU 0
#define MAXMEMORY_VOLATILE_TTL 1
#define MAXMEMORY_VOLATILE_RANDOM 2
#define MAXMEMORY_ALLKEYS_LRU 3
#define MAXMEMORY_ALLKEYS_RANDOM 4
#define MAXMEMORY_NO_EVICTION 5
可以看出主要分为三种,
- LRU:优先删除最近最少使用的键。
- TTL:优先删除生存时间最短的键。
- RANDOM:随机删除。
而ALLKEYS
和VOLATILE
的不同之处就是要确定是从数据库的 键值对字典 还是 过期键字典 中删除。
了解了以上这些,我们贴出代码:
// 按需释放内存空间
int freeMemoryIfNeeded(void) {
size_t mem_used, mem_tofree, mem_freed;
int slaves = listLength(server.slaves);
mstime_t latency, eviction_latency;
// 计算出服务器总的内存使用量,但是有两部分要减去
/*
1、从节点的输出缓冲区
2、AOF缓冲区
*/
mem_used = zmalloc_used_memory();
// 存在从节点
if (slaves) {
listIter li;
listNode *ln;
listRewind(server.slaves,&li);
// 遍历从节点链表
while((ln = listNext(&li))) {
client *slave = listNodeValue(ln);
// 获取当前从节点的输出缓冲区的大小,不包含静态的固定回复缓冲区,因为他总被分配
unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave);
// 减去当前从节点的输出缓冲区的大小
if (obuf_bytes > mem_used)
mem_used = 0;
else
mem_used -= obuf_bytes;
}
}
// 如果开启了AOF操作
if (server.aof_state != AOF_OFF) {
// 减去AOF缓冲区的大小
mem_used -= sdslen(server.aof_buf);
// 减去AOF重写缓冲区的大小
mem_used -= aofRewriteBufferSize();
}
// 如果没有超过服务器设置的最大内存限制,则返回C_OK
if (mem_used <= server.maxmemory) return C_OK;
// 如果内存回收策略为不回收,则返回C_ERR
if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
return C_ERR; /* We need to free memory, but policy forbids. */
// 计算需要回收的大小
mem_tofree = mem_used - server.maxmemory;
// 已回收的大小
mem_freed = 0;
// 设置回收延迟检测开始的时间
latencyStartMonitor(latency);
// 循环回收,直到到达需要回收大小
while (mem_freed < mem_tofree) {
int j, k, keys_freed = 0;
// 遍历所有的数据库
for (j = 0; j < server.dbnum; j++) {
long bestval = 0; /* just to prevent warning */
sds bestkey = NULL;
dictEntry *de;
redisDb *db = server.db+j;
dict *dict;
// 如果回收策略有ALLKEYS_LRU或RANDOM,从键值对字典中选择回收
if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_LRU ||
server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM)
{
// 则从键值对字典中选择回收的键。选择样品字典
dict = server.db[j].dict;
} else {
// 否则从过期键字典中选择回收的键。选择样品字典
dict = server.db[j].expires;
}
if (dictSize(dict) == 0) continue; //跳过空字典
/* volatile-random and allkeys-random policy */
// 如果回收策略有 ALLKEYS_RANDOM 或 VOLATILE_RANDOM,则是随机挑选
if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
{
// 随机返回一个key
de = dictGetRandomKey(dict);
bestkey = dictGetKey(de);
}
/* volatile-lru and allkeys-lru policy */
// 如果回收策略有 ALLKEYS_LRU 或 VOLATILE_LRU,则使用LRU策略
else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_LRU ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_LRU)
{
// 回收池
struct evictionPoolEntry *pool = db->eviction_pool;
while(bestkey == NULL) {
// evictionPoolPopulate()用于在每次我们想要过期一个键的时候,用几个节点填充evictionPool。 空闲时间小于当前key的之一的key被添加。 如果有free的节点,则始终添加key。 我们按升序插入key,所以空闲时间越短的键在左边,右边的空闲时间越长。
// 从样品字典dict中随机选择样品
evictionPoolPopulate(dict, db->dict, db->eviction_pool);
// 从空转时间最长的开始遍历
for (k = MAXMEMORY_EVICTION_POOL_SIZE-1; k >= 0; k--) {
// 跳过空位置
if (pool[k].key == NULL) continue;
// 从样品字典dict中查找当前key
de = dictFind(dict,pool[k].key);
// 从收回池中删除
sdsfree(pool[k].key);
// 释放位置
memmove(pool+k,pool+k+1,
sizeof(pool[0])*(MAXMEMORY_EVICTION_POOL_SIZE-k-1));
// 重置key和空转时间
pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key = NULL;
pool[MAXMEMORY_EVICTION_POOL_SIZE-1].idle = 0;
// 如果从样品字典中可以找到,则保存键
if (de) {
bestkey = dictGetKey(de);
break;
// 没找到,则继续找下一个样品空间所保存的键
} else {
/* Ghost... */
continue;
}
}
// 如果当前选出的所有的样品都没找到,则重新选择一批样品,知道找到一个可以释放的键
}
}
/* volatile-ttl */
// 如果回收策略有 VOLATILE_TTL,则选择生存时间最短的键
else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
// 抽样个数为maxmemory_samples个
for (k = 0; k < server.maxmemory_samples; k++) {
sds thiskey;
long thisval;
// 返回一个键,获取他的生存时间
de = dictGetRandomKey(dict);
thiskey = dictGetKey(de);
thisval = (long) dictGetVal(de);
// 如果当前键的生存时间更短,则保存
if (bestkey == NULL || thisval < bestval) {
bestkey = thiskey;
bestval = thisval;
}
}
}
/* Finally remove the selected key. */
// 删除所有被选择的键
if (bestkey) {
long long delta;
robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
// 当一个键在主节点中过期时,主节点会发送del命令给从节点和AOF文件
propagateExpire(db,keyobj);
// 单独计算dbDelete()所释放的空间大小, 在AOF和复制链接中传播DEL的内存实际上大于我们释放的key的内存
// 但是无法解释,窦泽不会退出循环
// AOF和输出缓冲区的内存最终被释放,所以我们只关心键空间使用的内存
delta = (long long) zmalloc_used_memory();
// 设置删除key对象的开始时间
latencyStartMonitor(eviction_latency);
dbDelete(db,keyobj);
// 保存删除key对象时间
latencyEndMonitor(eviction_latency);
// 添加到延迟诊断字典中
latencyAddSampleIfNeeded("eviction-del",eviction_latency);
// 删除嵌套的延迟事件
latencyRemoveNestedEvent(latency,eviction_latency);
// 计算删除这个键的大小
delta -= (long long) zmalloc_used_memory();
// 更新内存释放量
mem_freed += delta;
// 服务器总的回收键的个数计数器加1
server.stat_evictedkeys++;
// 事件通知
notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
keyobj, db->id);
// 释放键对象
decrRefCount(keyobj);
// 释放键的个数加1
keys_freed++;
// 如果有从节点,则刷新所有的输出缓冲区数据
if (slaves) flushSlavesOutputBuffers();
}
}
// 如果所有数据库都没有释放键,返回C_ERR
if (!keys_freed) {
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("eviction-cycle",latency);
return C_ERR; /* nothing to free... */
}
}
// 计算回收延迟的时间
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("eviction-cycle",latency);
return C_OK;
}
5. Redis服务器的main函数
Redis 服务器的mian()
主要执行了一下操作:Redis 单机服务器实现源码注释
- 初始化服务器状态
- 载入服务器的配置
- 初始化服务器数据结构
- 载入持久化文件还原数据库状态
- 执行事件循环
int main(int argc, char **argv) {
struct timeval tv;
int j;
#ifdef INIT_SETPROCTITLE_REPLACEMENT
spt_init(argc, argv);
#endif
// 本函数用来配置地域的信息,设置当前程序使用的本地化信息,LC_COLLATE 配置字符串比较
setlocale(LC_COLLATE,"");
// 设置线程安全
zmalloc_enable_thread_safeness();
// 设置内存溢出的处理函数
zmalloc_set_oom_handler(redisOutOfMemoryHandler);
// 初始化随机数发生器
srand(time(NULL)^getpid());
// 保存当前信息
gettimeofday(&tv,NULL);
// 设置哈希函数的种子
dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid());
// 检查开启哨兵模式的两种方式
server.sentinel_mode = checkForSentinelMode(argc,argv);
// 初始化服务器配置
initServerConfig();
// 设置可执行文件的绝对路径
server.executable = getAbsolutePath(argv[0]);
// 分配执行executable文件的参数列表的空间
server.exec_argv = zmalloc(sizeof(char*)*(argc+1));
server.exec_argv[argc] = NULL;
// 保存当前参数
for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]);
// 如果已开启哨兵模式
if (server.sentinel_mode) {
// 初始化哨兵的配置
initSentinelConfig();
initSentinel();
}
// 检查是否执行"redis-check-rdb"检查程序
if (strstr(argv[0],"redis-check-rdb") != NULL)
redis_check_rdb_main(argc,argv); //该函数不会返回
// 解析参数
if (argc >= 2) {
j = 1; /* First option to parse in argv[] */
sds options = sdsempty();
char *configfile = NULL;
/* Handle special options --help and --version */
// 指定了打印版本信息,然后退出
if (strcmp(argv[1], "-v") == 0 ||
strcmp(argv[1], "--version") == 0) version();
// 执行帮助信息,然后退出
if (strcmp(argv[1], "--help") == 0 ||
strcmp(argv[1], "-h") == 0) usage();
// 执行内存测试程序
if (strcmp(argv[1], "--test-memory") == 0) {
if (argc == 3) {
memtest(atoi(argv[2]),50);
exit(0);
} else {
fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");
exit(1);
}
}
/* First argument is the config file name? */
// 如果第1个参数不是'-',那么是配置文件
if (argv[j][0] != '-' || argv[j][1] != '-') {
configfile = argv[j];
// 设置配置文件的绝对路径
server.configfile = getAbsolutePath(configfile);
/* Replace the config file in server.exec_argv with
* its absoulte path. */
zfree(server.exec_argv[j]);
// 设置可执行的参数列表
server.exec_argv[j] = zstrdup(server.configfile);
j++;
}
// 解析指定的对象
while(j != argc) {
// 如果是以'-'开头
if (argv[j][0] == '-' && argv[j][1] == '-') {
/* Option name */
// 跳过"--check-rdb"
if (!strcmp(argv[j], "--check-rdb")) {
/* Argument has no options, need to skip for parsing. */
j++;
continue;
}
// 每个选项之间用'\n'隔开
if (sdslen(options)) options = sdscat(options,"\n");
// 将选项追加在sds中
options = sdscat(options,argv[j]+2);
// 选项和参数用 " "隔开
options = sdscat(options," ");
} else {
/* Option argument */
// 追加选项参数
options = sdscatrepr(options,argv[j],strlen(argv[j]));
options = sdscat(options," ");
}
j++;
}
// 如果开启哨兵模式,哨兵模式配置文件不正确
if (server.sentinel_mode && configfile && *configfile == '-') {
serverLog(LL_WARNING,
"Sentinel config from STDIN not allowed.");
serverLog(LL_WARNING,
"Sentinel needs config file on disk to save state. Exiting...");
exit(1);
}
// 重置save命令的参数
resetServerSaveParams();
// 载入配置文件
loadServerConfig(configfile,options);
sdsfree(options);
} else {
serverLog(LL_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
}
// 是否被监视
server.supervised = redisIsSupervised(server.supervised_mode);
// 是否以守护进程的方式运行
int background = server.daemonize && !server.supervised;
if (background) daemonize();
// 初始化服务器
initServer();
// 创建保存pid的文件
if (background || server.pidfile) createPidFile();
// 为服务器进程设置标题
redisSetProcTitle(argv[0]);
// 打印Redis的logo
redisAsciiArt();
// 检查backlog队列
checkTcpBacklogSettings();
// 如果不是哨兵模式
if (!server.sentinel_mode) {
/* Things not needed when running in Sentinel mode. */
serverLog(LL_WARNING,"Server started, Redis version " REDIS_VERSION);
#ifdef __linux__
// 打印内存警告
linuxMemoryWarnings();
#endif
// 从AOF文件或RDB文件载入数据
loadDataFromDisk();
// 如果开启了集群模式
if (server.cluster_enabled) {
// 集群模式下验证载入的数据
if (verifyClusterConfigWithData() == C_ERR) {
serverLog(LL_WARNING,
"You can't have keys in a DB different than DB 0 when in "
"Cluster mode. Exiting.");
exit(1);
}
}
// 打印端口号
if (server.ipfd_count > 0)
serverLog(LL_NOTICE,"The server is now ready to accept connections on port %d", server.port);
// 打印本地套接字fd
if (server.sofd > 0)
serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
} else {
// 开启哨兵模式,哨兵模式和集群模式只能开启一种
sentinelIsRunning();
}
/* Warning the user about suspicious maxmemory setting. */
// 最大内存限制是否配置正确
if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
}
// 进入事件循环之前执行beforeSleep()函数
aeSetBeforeSleepProc(server.el,beforeSleep);
// 运行事件循环,一直到服务器关闭
aeMain(server.el);
// 服务器关闭,删除事件循环
aeDeleteEventLoop(server.el);
return 0;
}