0%

Redis 相关功能实现

介绍了服务器与客户端相关联的结构及相关功能,二进制位数组的表示以及计算汉明重量的方法。

服务器与客户端交互

关联客户端

客户端链表结构

struct redisServer {
    //保存了全部客户端状态的链表, 便于通知客户端
    list *clients; // Client A -> Client B -> Client C
}

客户端标志位

客户端的标志属性 flags 记录了客户端的角色(role),以及客户端目前所处的状态:

typedef struct redisClient {
    //...
    
    int flags;
    
    //...
} redisClient;

flags 属性的值可以是单个标志:flags = <flag>
也可以是多个标志的二进制或,比如:flags = <flagl> | <flag2> | ...

/* Client flags */
#define CLIENT_SLAVE (1<<0)   /* This client is a replica */
#define CLIENT_MASTER (1<<1)  /* This client is a master */
#define CLIENT_MONITOR (1<<2) /* This client is a slave monitor, see MONITOR */
#define CLIENT_MULTI (1<<3)   /* This client is in a MULTI context */
#define CLIENT_BLOCKED (1<<4) /* The client is waiting in a blocking operation */
#define CLIENT_DIRTY_CAS (1<<5) /* Watched keys modified. EXEC will fail. */
#define CLIENT_CLOSE_AFTER_REPLY (1<<6) /* Close after writing entire reply. */
#define CLIENT_UNBLOCKED (1<<7) /* This client was unblocked and is stored in
                                  server.unblocked_clients */
#define CLIENT_LUA (1<<8) /* This is a non connected client used by Lua */
#define CLIENT_ASKING (1<<9)     /* Client issued the ASKING command */
#define CLIENT_CLOSE_ASAP (1<<10)/* Close this client ASAP */
#define CLIENT_UNIX_SOCKET (1<<11) /* Client connected via Unix domain socket */
#define CLIENT_DIRTY_EXEC (1<<12)  /* EXEC will fail for errors while queueing */
#define CLIENT_MASTER_FORCE_REPLY (1<<13)  /* Queue replies even if is master */
#define CLIENT_FORCE_AOF (1<<14)   /* Force AOF propagation of current cmd. */
#define CLIENT_FORCE_REPL (1<<15)  /* Force replication of current cmd. */
#define CLIENT_PRE_PSYNC (1<<16)   /* Instance don't understand PSYNC. */
#define CLIENT_READONLY (1<<17)    /* Cluster client is in read-only state. */
#define CLIENT_PUBSUB (1<<18)      /* Client is in Pub/Sub mode. */
#define CLIENT_PREVENT_AOF_PROP (1<<19)  /* Don't propagate to AOF. */
#define CLIENT_PREVENT_REPL_PROP (1<<20)  /* Don't propagate to slaves. */
#define CLIENT_PREVENT_PROP (CLIENT_PREVENT_AOF_PROP|CLIENT_PREVENT_REPL_PROP)
#define CLIENT_PENDING_WRITE (1<<21) /* Client has output to send but a write
                                        handler is yet not installed. */
#define CLIENT_REPLY_OFF (1<<22)   /* Don't send replies to client. */
#define CLIENT_REPLY_SKIP_NEXT (1<<23)  /* Set CLIENT_REPLY_SKIP for next cmd */
#define CLIENT_REPLY_SKIP (1<<24)  /* Don't send just this reply. */
#define CLIENT_LUA_DEBUG (1<<25)  /* Run EVAL in debug mode. */
#define CLIENT_LUA_DEBUG_SYNC (1<<26)  /* EVAL debugging without fork() */
#define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */
#define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */
#define CLIENT_PENDING_READ (1<<29) /* The client has pending reads and was put
                                       in the list of clients we can read
                                       from. */
#define CLIENT_PENDING_COMMAND (1<<30) /* Indicates the client has a fully
                                        * parsed command ready for execution. */
#define CLIENT_TRACKING (1ULL<<31) /* Client enabled keys tracking in order to
                                   perform client side caching. */
#define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */
#define CLIENT_TRACKING_BCAST (1ULL<<33) /* Tracking in BCAST mode. */
#define CLIENT_TRACKING_OPTIN (1ULL<<34)  /* Tracking in opt-in mode. */
#define CLIENT_TRACKING_OPTOUT (1ULL<<35) /* Tracking in opt-out mode. */
#define CLIENT_TRACKING_CACHING (1ULL<<36) /* CACHING yes/no was given,
                                              depending on optin/optout mode. */
#define CLIENT_TRACKING_NOLOOP (1ULL<<37) /* Don't send invalidation messages
                                             about writes performed by myself.*/
#define CLIENT_IN_TO_TABLE (1ULL<<38) /* This client is in the timeout table. */
#define CLIENT_PROTOCOL_ERROR (1ULL<<39) /* Protocol error chatting with it. */
#define CLIENT_CLOSE_AFTER_COMMAND (1ULL<<40) /* Close after executing commands
                                               * and writing entire reply. */
#define CLIENT_DENY_BLOCKING (1ULL<<41) /* Indicate that the client should not be blocked.
                                           currently, turned on inside MULTI, Lua, RM_Call,
                                           and AOF client */

PUBSUB 命令和 SCRIPT LOAD 命令的特殊性

通常情况下, Redis 只会将那些对数据库进行了修改的命令写入到 AOF 文件,并复制到各个从服务器。如果一个命令没有对数据库进行任何修改,那么它就会被认为是只读命令,这个命令不会被写入到 AOF 文件,也不会被复制到从服务器。

以上规则适用于绝大部分 Redis 命令,但 PUBSUB命令和 SCRIPT LOAD 命令是其中的例外。 PUBSUB 命令虽然没有修改数据库,但 PUBSUB 命令向频道的所有订阅者发送消息。这一行为带有副作用,接收到消息的所有客户端的状态都会因为这个命令而改变。因此,服务器需要使用 REDIS FORCE AOF 标志,强制将这个命令写入 AOF 文件这样在将来载入 AOF 文件时,服务器就可以再次执行相同的 PUBSUB 命令,并产生相同的副作用。SCRIPT LOAD 命令的情况与 PUBSUB 命令类似:虽然 SCRIPT LOAD 命令没有修改数据库,但它修改了服务器状态,所以它是一个带有副作用的命令,服务器需要使用 REDIS FORCE AOF 标志,强制将这个命令写入 AOF 文件,使得将来在载入 AOF 文件时,服务器可以产生相同的副作用。

另外,为了让主服务器和从服务器都可以正确地载入 SCRIPT LOAD 命令指定的脚本,服务器需要使用 REDIS FORCE REPL 标志,强制将 SCRIPT LOAD 命令复制给所有从服务器。

订阅发布

CLIENT1>SUBSCRIBE "news.it" //订阅 new.it 频道
    
CLIENT1>PSUBSCRIBE "news.*" //订阅与 new.* 表达式相匹配的频道

CLIENT2>PUBLISH "news.it" "new message" // 向news.it频道推送信息

我们可以看到订阅分成两种指定频道订阅以及模式匹配订阅

指定频道订阅

struct redisServer {
    //...
    
    //所有指定频道的订阅关系
    dict *pubsub_channels;
    
    //...
}

客户端订阅频道

SUBSCRIBE "news.it"

Redis 客户端向服务器发送订阅频道请求,服务端通过将频道与客户端的映射关系存放到 redisServer 来实现对客户端订阅的管理,服务端会在对应频道的客户端链表上加入当前请求的客户端。

向频道推送数据

PUBLISH "news.it" "new message"

当客户端向某个频道 PUBLISH 一条新消息,服务端会遍历该频道对应的客户端链表并发送相应的数据。

取消订阅频道

UNSUBSCRIBE "news.it" "news.movie"

服务端收到客户端的取消订阅信息,将该客户端从 pubsub_channels 中对应的频道删除。

模式匹配订阅

struct redisServer {
    //..
    
    //保存所有模式匹配的订阅关系
    list *pubsub_patterns;
    
    //..
}
typedef struct pubsubPattern {
    
    //订阅模式的客户端
    redisClient *client;
    //被订阅的模式
    robj *pattern;
    
} pubsubPattern;

下图 pubsub_patterns 属性是一个链表,链表中的每个节点都包含着一个 pubsubPattern 结构,这个结构的 pattern 属性记录了被订阅的模式,而 client 属性则记录了订阅模式的客户端

客户端订阅频道

PSUBSCRIBE "news.*"

Redis 客户端向服务器发送订阅频道请求,服务端通过将频道与客户端的映射关系存放到 redisServer 来实现对客户端订阅的管理,服务端会在对应频道的客户端链表上加入当前请求的客户端。

向频道推送数据

PUBLISH "news.it" "new message"

当客户端向某个频道 PUBLISH 一条新消息,服务端会遍历 pubsub_patterns 链表并为匹配成功的客户端发送相应的数据。

取消订阅频道

PUNSUBSCRIBE "news.*"

服务端收到客户端的取消订阅信息,将该客户端从 pubsub_patterns 中删除。

监视器

发送 MONITOR 命令可以让一个普通客户端变为一个监视器。并将该客户端引用保存在 monitors 链表中。

每当一个客户端向服务器发送一条命令请求时,服务器除了会处理这条命令请求之外,还会将关于这条命令请求的信息发送给所有监视器。监视器可以看到所有监视服务器接收到的命令。

二进制位数组

位数组的表示

Redis 使用字符串对象来表示位数组,因为字符串对象使用的 SDS 数据结构是二进制安全的,所以程序可以直接使用 SDS 结构来保存位数组,并使用 SDS 结构的操作函数来处理位数组。

对于这个len为 1 的buf数组,buf[0]储存着一字节长的位数组,buf[1]储存着 SDS 程序自动追加到值的末尾的空字符\0

而下图是len为 3 的位数组。

BITCOUNT命令的实现

BITCOUNT 命令用于统计给定位数组中,值为 1 的二进制位的数量。而高效地实现这个命令需要用到一些精巧的算法。

BITCOUNT遍历算法

实现 BITCOUNT命令最简单直接的方法,就是遍历位数组中的每个二进制位,并在遇到值为 1 的二进制位时,将计数器的值增一。

BITCOUNT查表算法

优化检查操作的一个办法是使用查表法:

  • 对于一个有限集合来说,集合元素的排列方式是有限的。
  • 而对于一个有限长度的位数组来说,它能表示的二进制位排列也是有限的。

但查表法的实际效果会受到内存和缓存两方面因素的限制:

  • 因为査表法是典型的空间换时间策略。对于 8 位的二进制位的问题来说,创建键长为8位的表仅需数百个字节,但创建键长为 32 位的表却需要十多个 GB。在实际中,服务器只可能接受数百个字节或者数百KB的内存消耗。
  • 除了内存大小的问题之外,查表法的效果还会受到 CPU 缓存的限制:对于固定大小的 CPU 缓存来说,创建的表格越大,CPU 缓存所能保存的内容相比整个表格的比例就越少,查表时出现缓存不命中的情况就会越高,缓存的换入和换出操作就会越频繁,最终影响查表法的实际效率。

由于以上列举的两个原因,我们可以得出结论,查表法是一种比遍历算法更好的统计办法,但受限于查表法带来的内存压力,以及缓存不命中可能带来的影响,我们只能考虑创建键长为8位或者键长为16位的表,而这两种表带来的效率提升,对于处理非常长的位数组来说仍然远远不够。

variable-precision SWAR算法

BITCOUNT 命令要解决的问题 — 统计一个位数组中非 0 二进制位的数量,在数学上被称为”计算汉明重量(Hamming Weight)”。

因为汉明重量经常被用于信息论、编码理论和密码学,所以研究人员针对计算汉明重量开发了多种不同的算法,一些处理器甚至直接带有计算汉明重量的指令,而对于不具备这种特殊指令的普通处理器来说,目前已知效率最好的通用算法为 variable-precision SWAR 算法,该算法通过一系列位移和位运算操作,可以在常数时间内计算多个字节的汉明重量,并且不需要使用任何额外的内存。

以下是一个处理32位长度位数组的 variable-precision S WAR算法的实现:

uint32_t swar(uint32_t i) {
    //步骤1 0x55555555 -> 01010101010101010101010101010101
    //将 i 按每两个二进制位为一组进行分组,各组的十进制表示就是该组的汉明重量
    i = (i & 0x55555555) + ((i >> 1) & 0x55555555);
    //步骤2 0x33333333 -> 00110011001100110011001100110011
    //将 i 按每四个二进制位为一组进行分组,各组的十进制表示就是该组的汉明重量
    i = (i & 0x33333333) + ((i >> 2) & 0x33333333);
    //步骤3 0x0F0F0F0F -> 00001111000011110000111100001111
    //将 i 按每八个二进制位为一组进行分组,各组的十进制表示就是该组的汉明重量
    i = (i & 0x0F0F0F0F) + ((i >> 4) & 0x0F0F0F0F);
    //步骤4
    //i * 0x01010101 计算出 bitarray 的汉明重量并记录在二进制位的最高八位
    //>> 24 语句则通过右移运算,将 bitarray 的汉明重量移动到最低八位
    i = ((i * 0x01010101) >> 24);
    return i;
}

swar 函数每次执行可以计算 32 个二进制位的汉明重量,它比之前介绍的遍历算法要快 32 倍,比键长为 8 位的查表法快 4 倍,并且因为 swar 函数空间和时间复杂度都是 O(1) 。当然,在一个循环里执行多个 swar 调用这种优化方式是有极限的:一旦循环中处理的位数组的大小超过了缓存的大小,这种优化的效果就会降低并最终消失。