redis 数据(key-value
),value
往往是对象的形式存在(redisObject
)。
1. 对象
1.1. 数据结构
typedef struct redisObject {
// 对象类型
unsigned type:4;
// 对象编码类型
unsigned encoding:4;
// 对象操作时间
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
// 使用计数
int refcount;
// 数据内容
void *ptr;
} robj;
1.2. 类型(type)
/* The actual Redis Object */
#define OBJ_STRING 0 /* String object. */
#define OBJ_LIST 1 /* List object. */
#define OBJ_SET 2 /* Set object. */
#define OBJ_ZSET 3 /* Sorted set object. */
#define OBJ_HASH 4 /* Hash object. */
1.3. 编码(encoding)
/* Objects encoding. Some kind of objects like Strings and Hashes can be
* internally represented in multiple ways. The 'encoding' field of the object
* is set to one of this fields for this object. */
#define OBJ_ENCODING_RAW 0 /* Raw representation */
#define OBJ_ENCODING_INT 1 /* Encoded as integer */
#define OBJ_ENCODING_HT 2 /* Encoded as hash table */
#define OBJ_ENCODING_ZIPMAP 3 /* Encoded as zipmap */
#define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old list encoding. */
#define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define OBJ_ENCODING_INTSET 6 /* Encoded as intset */
#define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */
#define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string encoding */
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */
#define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree of listpacks */
1.4. 引用计数(refcount)
为了内存能重复使用,节省内存空间,redis 数据对象可能被共享,通过引用计数,统计对象被引用的次数。在内存回收时,当引用计数为 1,说明没有其它逻辑正在使用它,可以进行回收销毁。
object.c
void incrRefCount(robj *o) {
if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount++;
}
void decrRefCount(robj *o) {
if (o->refcount == 1) {
switch(o->type) {
case OBJ_STRING: freeStringObject(o); break;
case OBJ_LIST: freeListObject(o); break;
case OBJ_SET: freeSetObject(o); break;
case OBJ_ZSET: freeZsetObject(o); break;
case OBJ_HASH: freeHashObject(o); break;
case OBJ_MODULE: freeModuleObject(o); break;
case OBJ_STREAM: freeStreamObject(o); break;
default: serverPanic("Unknown object type"); break;
}
zfree(o);
} else {
if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");
if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--;
}
}
void freeStreamObject(robj *o) {
freeStream(o->ptr);
}
1.5. 最近操作时间(lru)
lru 主要是记录对象的最近使用时间和使用频率,具体用法可以参考 [redis 源码走读] maxmemory 数据淘汰策略
2. 创建对象
robj *createObject(int type, void *ptr) {
robj *o = zmalloc(sizeof(*o));
o->type = type;
o->encoding = OBJ_ENCODING_RAW;
o->ptr = ptr;
o->refcount = 1;
/* Set the LRU to the current lruclock (minutes resolution), or
* alternatively the LFU counter. */
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
} else {
o->lru = LRU_CLOCK();
}
return o;
}
3. 工作流程
对象实现可以通过调试,看看实现逻辑。刚看完整数集合,可以跟踪下实现流程。
3.1. 命令
sadd keytest 1 2 3
3.2. 堆栈
下断点看看命令工作流程
intsetNew() (/Users/xxx/src/other/redis/src/intset.c:100)
createIntsetObject() (/Users/xxx/src/other/redis/src/object.c:236)
setTypeCreate(sds value) (/Users/xxx/src/other/redis/src/t_set.c:44)
saddCommand(client * c) (/Users/xxx/src/other/redis/src/t_set.c:270)
...
aeMain(aeEventLoop * eventLoop) (/Users/xxx/src/other/redis/src/ae.c:515)
main(int argc, char ** argv) (/Users/xxx/src/other/redis/src/server.c:5054)
参考:《用 gdb 调试 redis》
3.3. 命令结构
struct redisCommand {
char *name;
redisCommandProc *proc;
int arity;
char *sflags; /* Flags as string representation, one char per flag. */
uint64_t flags; /* The actual flags, obtained from the 'sflags' field. */
/* Use a function to determine keys arguments in a command line.
* Used for Redis Cluster redirect. */
redisGetKeysProc *getkeys_proc;
/* What keys should be loaded in background when calling this command? */
int firstkey; /* The first argument that's a key (0 = no keys) */
int lastkey; /* The last argument that's a key */
int keystep; /* The step between first and last key */
long long microseconds, calls;
int id; /* Command ID. This is a progressive ID starting from 0 that
is assigned at runtime, and is used in order to check
ACLs. A connection is able to execute a given command if
the user associated to the connection has this command
bit set in the bitmap of allowed commands. */
};
- 初始化命令
struct redisServer {
...
dict *commands; /* Command table */
dict *orig_commands; /* Command table before command renaming. */
...
};
struct redisCommand redisCommandTable[] = {
...
{"sadd",saddCommand,-3,
"write use-memory fast @set",
0,NULL,1,1,1,0,0,0},
...
}
int main(int argc, char **argv) {
...
initServerConfig();
...
}
void initServerConfig(void) {
...
/* Command table -- we initiialize it here as it is part of the
* initial configuration, since command names may be changed via
* redis.conf using the rename-command directive. */
server.commands = dictCreate(&commandTableDictType,NULL);
server.orig_commands = dictCreate(&commandTableDictType,NULL);
populateCommandTable();
server.delCommand = lookupCommandByCString("del");
...
}
/* Populates the Redis Command Table starting from the hard coded list
* we have on top of redis.c file. */
void populateCommandTable(void) {
int j;
int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
for (j = 0; j < numcommands; j++) {
struct redisCommand *c = redisCommandTable+j;
int retval1, retval2;
/* Translate the command string flags description into an actual
* set of flags. */
if (populateCommandTableParseFlags(c,c->sflags) == C_ERR)
serverPanic("Unsupported command flag");
c->id = ACLGetCommandID(c->name); /* Assign the ID used for ACL. */
retval1 = dictAdd(server.commands, sdsnew(c->name), c);
/* Populate an additional dictionary that will be unaffected
* by rename-command statements in redis.conf. */
retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
}
}
- 根据命令字符串,查找对应的
redisCommand
int processCommand(client *c) {
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
}
- 命令处理函数
void saddCommand(client *c) {
robj *set;
int j, added = 0;
// 查找 key 是否已经存在
set = lookupKeyWrite(c->db,c->argv[1]);
if (set == NULL) {
// 根据命令数值去确定实现编码。
set = setTypeCreate(c->argv[2]->ptr);
dbAdd(c->db,c->argv[1],set);
} else {
if (set->type != OBJ_SET) {
addReply(c,shared.wrongtypeerr);
return;
}
}
for (j = 2; j < c->argc; j++) {
if (setTypeAdd(set,c->argv[j]->ptr)) added++;
}
if (added) {
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
}
server.dirty += added;
addReplyLongLong(c,added);
}
- 根据存储的数据,底层决定用那种编码进行保存。
/* Factory method to return a set that *can* hold "value". When the object has
* an integer-encodable value, an intset will be returned. Otherwise a regular
* hash table. */
robj *setTypeCreate(sds value) {
if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
return createIntsetObject();
return createSetObject();
}
robj *createIntsetObject(void) {
intset *is = intsetNew();
robj *o = createObject(OBJ_SET,is);
o->encoding = OBJ_ENCODING_INTSET;
return o;
}
robj *createSetObject(void) {
dict *d = dictCreate(&setDictType,NULL);
robj *o = createObject(OBJ_SET,d);
o->encoding = OBJ_ENCODING_HT;
return o;
}
3.4. 其它
以上是集合的工作流程,其它功能使用流程以此类推,当然你也可以走下哈希功能。编码是 dict
和 ziplist
的结合。推例如:
hset keytest filedtest valuetest
robj *createHashObject(void) {
unsigned char *zl = ziplistNew();
robj *o = createObject(OBJ_HASH, zl);
o->encoding = OBJ_ENCODING_ZIPLIST;
return o;
}
void hsetCommand(client *c) {
...
if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
hashTypeTryConversion(o,c->argv,2,c->argc-1);
for (i = 2; i < c->argc; i += 2)
created += !hashTypeSet(o,c->argv[i]->ptr,c->argv[i+1]->ptr,HASH_SET_COPY);
...
}
void hashTypeTryConversion(robj *o, robj **argv, int start, int end) {
...
for (i = start; i <= end; i++) {
if (sdsEncodedObject(argv[i]) &&
sdslen(argv[i]->ptr) > server.hash_max_ziplist_value)
{
hashTypeConvert(o, OBJ_ENCODING_HT);
break;
}
}
}
int hashTypeSet(robj *o, sds field, sds value, int flags) {
int update = 0;
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
...
} else if (o->encoding == OBJ_ENCODING_HT) {
...
} else {
serverPanic("Unknown hash encoding");
}
}
4. 后记
- 很多时候,我们理解面向对象,会单纯认为
C++
,java
等语言。所谓面向对象,是对事物逻辑进行抽象,无关语言,例如c
语言实现的redis
对象,就很好阐述了这个问题。 - [redis 源码走读] 系列,根据 《redis 设计与实现》书籍的目录路线,结合相关内容进行源码阅读。
- 计划事无巨细走读源码,发现时间不允许,很多细节在实际应用中可以不断查阅巩固,而且不少书籍博客也有很详尽的解说。redis 在使用过程中,自己还有不少疑惑,带着问题,在查阅过资料后,做一些总结,便于遗忘查阅。
5. 参考
- 《redis 设计与实现》