2024-04-05
原文作者:文先生的博客 原文地址: http://wenfh2020.com/2020/02/05/redis-obj/

redis 数据(key-value),value 往往是对象的形式存在(redisObject)。


1. 对象

202404052231464701.png

1.1. 数据结构

 
    typedef struct redisObject {
        // 对象类型
        unsigned type:4;
        // 对象编码类型
        unsigned encoding:4;
        // 对象操作时间
        unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
                                * LFU data (least significant 8 bits frequency
                                * and most significant 16 bits access time). */
        // 使用计数
        int refcount;
        // 数据内容
        void *ptr;
    } robj;

1.2. 类型(type)

 
    /* The actual Redis Object */
    #define OBJ_STRING 0    /* String object. */
    #define OBJ_LIST 1      /* List object. */
    #define OBJ_SET 2       /* Set object. */
    #define OBJ_ZSET 3      /* Sorted set object. */
    #define OBJ_HASH 4      /* Hash object. */

1.3. 编码(encoding)

 
    /* Objects encoding. Some kind of objects like Strings and Hashes can be
     * internally represented in multiple ways. The 'encoding' field of the object
     * is set to one of this fields for this object. */
    #define OBJ_ENCODING_RAW 0     /* Raw representation */
    #define OBJ_ENCODING_INT 1     /* Encoded as integer */
    #define OBJ_ENCODING_HT 2      /* Encoded as hash table */
    #define OBJ_ENCODING_ZIPMAP 3  /* Encoded as zipmap */
    #define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old list encoding. */
    #define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
    #define OBJ_ENCODING_INTSET 6  /* Encoded as intset */
    #define OBJ_ENCODING_SKIPLIST 7  /* Encoded as skiplist */
    #define OBJ_ENCODING_EMBSTR 8  /* Embedded sds string encoding */
    #define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */
    #define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree of listpacks */

1.4. 引用计数(refcount)

为了内存能重复使用,节省内存空间,redis 数据对象可能被共享,通过引用计数,统计对象被引用的次数。在内存回收时,当引用计数为 1,说明没有其它逻辑正在使用它,可以进行回收销毁。

object.c

 
    void incrRefCount(robj *o) {
        if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount++;
    }
    
    void decrRefCount(robj *o) {
        if (o->refcount == 1) {
            switch(o->type) {
            case OBJ_STRING: freeStringObject(o); break;
            case OBJ_LIST: freeListObject(o); break;
            case OBJ_SET: freeSetObject(o); break;
            case OBJ_ZSET: freeZsetObject(o); break;
            case OBJ_HASH: freeHashObject(o); break;
            case OBJ_MODULE: freeModuleObject(o); break;
            case OBJ_STREAM: freeStreamObject(o); break;
            default: serverPanic("Unknown object type"); break;
            }
            zfree(o);
        } else {
            if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");
            if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--;
        }
    }
    
    void freeStreamObject(robj *o) {
        freeStream(o->ptr);
    }

1.5. 最近操作时间(lru)

lru 主要是记录对象的最近使用时间和使用频率,具体用法可以参考 [redis 源码走读] maxmemory 数据淘汰策略


2. 创建对象

 
    robj *createObject(int type, void *ptr) {
        robj *o = zmalloc(sizeof(*o));
        o->type = type;
        o->encoding = OBJ_ENCODING_RAW;
        o->ptr = ptr;
        o->refcount = 1;
    
        /* Set the LRU to the current lruclock (minutes resolution), or
         * alternatively the LFU counter. */
        if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
            o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
        } else {
            o->lru = LRU_CLOCK();
        }
        return o;
    }

3. 工作流程

对象实现可以通过调试,看看实现逻辑。刚看完整数集合,可以跟踪下实现流程。

3.1. 命令

 
    sadd keytest 1 2 3

3.2. 堆栈

下断点看看命令工作流程

 
    intsetNew() (/Users/xxx/src/other/redis/src/intset.c:100)
    createIntsetObject() (/Users/xxx/src/other/redis/src/object.c:236)
    setTypeCreate(sds value) (/Users/xxx/src/other/redis/src/t_set.c:44)
    saddCommand(client * c) (/Users/xxx/src/other/redis/src/t_set.c:270)
    ...
    aeMain(aeEventLoop * eventLoop) (/Users/xxx/src/other/redis/src/ae.c:515)
    main(int argc, char ** argv) (/Users/xxx/src/other/redis/src/server.c:5054)

参考:《用 gdb 调试 redis


3.3. 命令结构

 
    struct redisCommand {
        char *name;
        redisCommandProc *proc;
        int arity;
        char *sflags;   /* Flags as string representation, one char per flag. */
        uint64_t flags; /* The actual flags, obtained from the 'sflags' field. */
        /* Use a function to determine keys arguments in a command line.
         * Used for Redis Cluster redirect. */
        redisGetKeysProc *getkeys_proc;
        /* What keys should be loaded in background when calling this command? */
        int firstkey; /* The first argument that's a key (0 = no keys) */
        int lastkey;  /* The last argument that's a key */
        int keystep;  /* The step between first and last key */
        long long microseconds, calls;
        int id;     /* Command ID. This is a progressive ID starting from 0 that
                       is assigned at runtime, and is used in order to check
                       ACLs. A connection is able to execute a given command if
                       the user associated to the connection has this command
                       bit set in the bitmap of allowed commands. */
    };
  • 初始化命令
 
    struct redisServer {
        ...
        dict *commands;             /* Command table */
        dict *orig_commands;        /* Command table before command renaming. */
        ...
    };
    
    struct redisCommand redisCommandTable[] = {
        ...
        {"sadd",saddCommand,-3,
         "write use-memory fast @set",
         0,NULL,1,1,1,0,0,0},
        ...
    }
    
    int main(int argc, char **argv) {
        ...
        initServerConfig();
        ...
    }
    
    void initServerConfig(void) {
        ...
        /* Command table -- we initiialize it here as it is part of the
         * initial configuration, since command names may be changed via
         * redis.conf using the rename-command directive. */
        server.commands = dictCreate(&commandTableDictType,NULL);
        server.orig_commands = dictCreate(&commandTableDictType,NULL);
        populateCommandTable();
        server.delCommand = lookupCommandByCString("del");
        ...
    }
    
    /* Populates the Redis Command Table starting from the hard coded list
     * we have on top of redis.c file. */
    void populateCommandTable(void) {
        int j;
        int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
    
        for (j = 0; j < numcommands; j++) {
            struct redisCommand *c = redisCommandTable+j;
            int retval1, retval2;
    
            /* Translate the command string flags description into an actual
             * set of flags. */
            if (populateCommandTableParseFlags(c,c->sflags) == C_ERR)
                serverPanic("Unsupported command flag");
    
            c->id = ACLGetCommandID(c->name); /* Assign the ID used for ACL. */
            retval1 = dictAdd(server.commands, sdsnew(c->name), c);
            /* Populate an additional dictionary that will be unaffected
             * by rename-command statements in redis.conf. */
            retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
            serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
        }
    }
  • 根据命令字符串,查找对应的 redisCommand
 
    int processCommand(client *c) {
        /* Now lookup the command and check ASAP about trivial error conditions
         * such as wrong arity, bad command name and so forth. */
        c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    }
  • 命令处理函数
 
    void saddCommand(client *c) {
        robj *set;
        int j, added = 0;
    
        // 查找 key 是否已经存在
        set = lookupKeyWrite(c->db,c->argv[1]);
        if (set == NULL) {
            // 根据命令数值去确定实现编码。
            set = setTypeCreate(c->argv[2]->ptr);
            dbAdd(c->db,c->argv[1],set);
        } else {
            if (set->type != OBJ_SET) {
                addReply(c,shared.wrongtypeerr);
                return;
            }
        }
    
        for (j = 2; j < c->argc; j++) {
            if (setTypeAdd(set,c->argv[j]->ptr)) added++;
        }
        if (added) {
            signalModifiedKey(c->db,c->argv[1]);
            notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
        }
        server.dirty += added;
        addReplyLongLong(c,added);
    }
  • 根据存储的数据,底层决定用那种编码进行保存。
 
    /* Factory method to return a set that *can* hold "value". When the object has
     * an integer-encodable value, an intset will be returned. Otherwise a regular
     * hash table. */
    robj *setTypeCreate(sds value) {
        if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
            return createIntsetObject();
        return createSetObject();
    }
    
    robj *createIntsetObject(void) {
        intset *is = intsetNew();
        robj *o = createObject(OBJ_SET,is);
        o->encoding = OBJ_ENCODING_INTSET;
        return o;
    }
    
    robj *createSetObject(void) {
        dict *d = dictCreate(&setDictType,NULL);
        robj *o = createObject(OBJ_SET,d);
        o->encoding = OBJ_ENCODING_HT;
        return o;
    }

3.4. 其它

以上是集合的工作流程,其它功能使用流程以此类推,当然你也可以走下哈希功能。编码是 dictziplist 的结合。推例如:

 
    hset keytest filedtest valuetest
 
    robj *createHashObject(void) {
        unsigned char *zl = ziplistNew();
        robj *o = createObject(OBJ_HASH, zl);
        o->encoding = OBJ_ENCODING_ZIPLIST;
        return o;
    }
    
    void hsetCommand(client *c) {
        ...
        if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
        hashTypeTryConversion(o,c->argv,2,c->argc-1);
    
        for (i = 2; i < c->argc; i += 2)
            created += !hashTypeSet(o,c->argv[i]->ptr,c->argv[i+1]->ptr,HASH_SET_COPY);
        ...
    }
    
    void hashTypeTryConversion(robj *o, robj **argv, int start, int end) {
        ...
        for (i = start; i <= end; i++) {
            if (sdsEncodedObject(argv[i]) &&
                sdslen(argv[i]->ptr) > server.hash_max_ziplist_value)
            {
                hashTypeConvert(o, OBJ_ENCODING_HT);
                break;
            }
        }
    }
    
    int hashTypeSet(robj *o, sds field, sds value, int flags) {
        int update = 0;
    
        if (o->encoding == OBJ_ENCODING_ZIPLIST) {
            ...
        } else if (o->encoding == OBJ_ENCODING_HT) {
            ...
        } else {
            serverPanic("Unknown hash encoding");
        }
    }

4. 后记

  • 很多时候,我们理解面向对象,会单纯认为 C++java 等语言。所谓面向对象,是对事物逻辑进行抽象,无关语言,例如 c 语言实现的 redis 对象,就很好阐述了这个问题。
  • [redis 源码走读] 系列,根据 《redis 设计与实现》书籍的目录路线,结合相关内容进行源码阅读。
  • 计划事无巨细走读源码,发现时间不允许,很多细节在实际应用中可以不断查阅巩固,而且不少书籍博客也有很详尽的解说。redis 在使用过程中,自己还有不少疑惑,带着问题,在查阅过资料后,做一些总结,便于遗忘查阅。

5. 参考

  • 《redis 设计与实现》
阅读全文