《单机数据库》

对Redis服务器的数据库实现进行介绍


介绍服务器保存数据库的方法,客户端切换数据库的方法,数据库保存键值对的方法,以及针对数据库的添加、删除、查看、更新的实现方法,服务器保存见得过期时间的方法,以及服务器自动删除过期键的方法,数据库通知功能的实现方法

redis.h/redisServer结构中定义了服务器的大量属性

数据库

redis.h/redisServer结构中有个redisDb *db;属性,它代表了所有数据库,每一项都指向了一个数据库

其中的属性int dbnum;/* Total number of configured DBs */则指明了数据库的个数,默认为16

redis.h/redisClient结构中也有一个redisDb *db属性,记录了该客户端使用的目标数据库。SELECT命令可以修改客户端的目标数据库,底层原理就是上述指针,使其指向服务器的不同数据库

键空间

1
2
3
4
5
6
7
8
9
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id;
long long avg_ttl; /* Average TTL, just for stats */
} redisDb;

我们知道每个数据库都由上述结构表示,其中的dict字典保存了所有键值对,称这个字典为键空间(keysspace),键空间和用户所见的数据库是一一对应的

对键值对的添加、删除更新等操作,都是通过键空间来进行处理完成的

服务器不仅对键空间执行指定的读写操作,还会执行额外的维护操作

  • 会更新服务器的键空间命中次数和不命中次数
  • 更新键的LRU(最后一次使用)时间,可用于计算键的闲置时间
  • 删除过期键
  • 使用WATCH命令监视键,如果在该键被修改时,将其标记为脏(dirty),从而让事务程序注意到这个键被修改
  • 对脏键的计数器+1,这个计数器会触发服务器的持久化和复制操作
  • 修改键之后,就会按照配置触发发送响相应的数据库通知

生存时间或过期时间

通过EXPIREPEXPIRE命令可以以秒或毫秒给键设置生存时间(Time To Live,TTL)。同理,通过EXPIREATPEXPIREAT命令可以设置过期时间,过期时间来临时,服务器会自动删除该键

通过上述命令设置了键后,可以通过TTLPTTL查看键的剩余生存时间

前面提到的四个设置生存时间或是过期时间的命令,最终都是转换成PEXPIREAT命令来执行

redisDb结构中有expires属性,来保存数据库所有键的过期时间,称这个属性为过期字典。过期字典的键是一个指向键空间某个键对象,值就是UNIX时间戳的long long类型整数

过期删除策略

当一个键过期时,有三种策略去删除过期键

  • 定时删除:在设置键的过期时间的同时,设置一个定时器,让定时器执行对键的过期删除
  • 惰性删除:放任键过期不管,但每次从键空间取出键的时候,检查键是否过期
  • 定期删除:每隔一段时间,程序对数据库进行检查,删除里面的过期键

定时删除由于能及时删除过期键,对内存最为友好,但是设置定时器意味着要创建线程去占用CPU时间,当大量请求等待服务器处理时,就不应该把CPU时间浪费在处理过期键上

除此之外,创建一个定时器要用到Redis服务器的时间事件,而时间事件的实现方式无序链表,这意味着并不能高效处理特定事件

惰性删除对CPU友好,但是对内存不友好,过期键可能永远不会被删除(除非用户执行FLUSHDB),这就类似于内存泄漏

定期删除就是上面两种方法的折中,这种方法的关键在于设置删除操作的执行时长和频率

Redis使用的是惰性删除和定期删除

惰性删除

db.c/expireIfNeeded函数实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
int expireIfNeeded(redisDb *db, robj *key) {
if (!keyIsExpired(db,key)) return 0;

/* If we are running in the context of a slave, instead of
* evicting the expired key from the database, we return ASAP:
* the slave key expiration is controlled by the master that will
* send us synthesized DEL operations for expired keys.
*
* Still we try to return the right information to the caller,
* that is, 0 if we think the key should be still valid, 1 if
* we think the key is expired at this time. */
if (server.masterhost != NULL) return 1;

/* Delete the key */
server.stat_expiredkeys++;
propagateExpire(db,key,server.lazyfree_lazy_expire);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",key,db->id);
return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
dbSyncDelete(db,key);
}

所有读写数据库的命令都会调用上述函数对输入键进行检查。如第一行所示,如果键没有过期,不做动作,他就像过滤器,过滤掉过期的输入键

定期删除

redis.c/activeExpireCycle函数实现,服务器周期性函数redis.c/serverCron函数执行时,就会调用该函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
/* Try to expire a few timed out keys. The algorithm used is adaptive and
* will use few CPU cycles if there are few expiring keys, otherwise
* it will get more aggressive to avoid that too much memory is used by
* keys that can be removed from the keyspace.
*
* No more than REDIS_DBCRON_DBS_PER_CALL databases are tested at every
* iteration.
*
* This kind of call is used when Redis detects that timelimit_exit is
* true, so there is more work to do, and we do it more incrementally from
* the beforeSleep() function of the event loop.
*
* Expire cycle type:
*
* If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a
* "fast" expire cycle that takes no longer than EXPIRE_FAST_CYCLE_DURATION
* microseconds, and is not repeated again before the same amount of time.
*
* If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is
* executed, where the time limit is a percentage of the REDIS_HZ period
* as specified by the REDIS_EXPIRELOOKUPS_TIME_PERC define. */

void activeExpireCycle(int type) {
/* This function has some global state in order to continue the work
* incrementally across calls. */
static unsigned int current_db = 0; /* Last DB tested. */
static int timelimit_exit = 0; /* Time limit hit in previous call? */
static long long last_fast_cycle = 0; /* When last fast cycle ran. */

int j, iteration = 0;
int dbs_per_call = REDIS_DBCRON_DBS_PER_CALL;
long long start = ustime(), timelimit;

if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
/* Don't start a fast cycle if the previous cycle did not exited
* for time limt. Also don't repeat a fast cycle for the same period
* as the fast cycle total duration itself. */
if (!timelimit_exit) return;
if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return;
last_fast_cycle = start;
}

/* We usually should test REDIS_DBCRON_DBS_PER_CALL per iteration, with
* two exceptions:
*
* 1) Don't test more DBs than we have.
* 2) If last time we hit the time limit, we want to scan all DBs
* in this iteration, as there is work to do in some DB and we don't want
* expired keys to use memory for too much time. */
if (dbs_per_call > server.dbnum || timelimit_exit)
dbs_per_call = server.dbnum;

/* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time
* per iteration. Since this function gets called with a frequency of
* server.hz times per second, the following is the max amount of
* microseconds we can spend in this function. */
timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100;
timelimit_exit = 0;
if (timelimit <= 0) timelimit = 1;

if (type == ACTIVE_EXPIRE_CYCLE_FAST)
timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */

for (j = 0; j < dbs_per_call; j++) {
int expired;
redisDb *db = server.db+(current_db % server.dbnum);

/* Increment the DB now so we are sure if we run out of time
* in the current DB we'll restart from the next. This allows to
* distribute the time evenly across DBs. */
current_db++;

/* Continue to expire if at the end of the cycle more than 25%
* of the keys were expired. */
do {
unsigned long num, slots;
long long now, ttl_sum;
int ttl_samples;

/* If there is nothing to expire try next DB ASAP. */
if ((num = dictSize(db->expires)) == 0) {
db->avg_ttl = 0;
break;
}
slots = dictSlots(db->expires);
now = mstime();

/* When there are less than 1% filled slots getting random
* keys is expensive, so stop here waiting for better times...
* The dictionary will be resized asap. */
if (num && slots > DICT_HT_INITIAL_SIZE &&
(num*100/slots < 1)) break;

/* The main collection cycle. Sample random keys among keys
* with an expire set, checking for expired ones. */
expired = 0;
ttl_sum = 0;
ttl_samples = 0;

if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;

while (num--) {
dictEntry *de;
long long ttl;

if ((de = dictGetRandomKey(db->expires)) == NULL) break;
ttl = dictGetSignedIntegerVal(de)-now;
if (activeExpireCycleTryExpire(db,de,now)) expired++;
if (ttl < 0) ttl = 0;
ttl_sum += ttl;
ttl_samples++;
}

/* Update the average TTL stats for this database. */
if (ttl_samples) {
long long avg_ttl = ttl_sum/ttl_samples;

if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
/* Smooth the value averaging with the previous one. */
db->avg_ttl = (db->avg_ttl+avg_ttl)/2;
}

/* We can't block forever here even if there are many keys to
* expire. So after a given amount of milliseconds return to the
* caller waiting for the other active expire cycle. */
iteration++;
if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
long long elapsed = ustime()-start;

latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);
if (elapsed > timelimit) timelimit_exit = 1;
}
if (timelimit_exit) return;
/* We don't repeat the cycle if there are less than 25% of keys
* found expired in the current DB. */
} while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4);
}
}

函数中定义了全局变量current_db,记录当前函数检查的进度,并在下一个检查的时候根据该值据需上次的检查。当检查完一轮时,将该值设置为0

从上述代码可以看出,该函数还有其他一些优化和细节值的讨论

复制

当服务器运行在复制模式下,从服务器的过期键删除动作由主服务器控制

  • 主服务器在删除一个过期键之后,会显式向所有从服务器发送一个DEL命令,告知从服务器删除该过期键
  • 从服务器在执行客户端发送的读命令时,即使碰到过期键也不会将过期键删除,继续像处理未过期键一样处理过期键

通过主从复制模型,也就是通过主服务器控制从服务器统一删除过期键,可以保证主从服务器数据的一致性???这里为什么说是一致性??明明查询从服务器上的过期键会返回该键,而不是返回nil??

通知

客户端通过订阅给定的频道或者模式,来获知数据库中键的变化,以及数据库中命令的执行情况

通知类型分为键空间通知(这类通知关注某个键执行了什么命令)和键事件通知(这类通知关注某个命令被什么键执行)

实现通知功能的是notify.c/notifyKeyspaceEvent函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/* The API provided to the rest of the Redis core is a simple function:
*
* notifyKeyspaceEvent(char *event, robj *key, int dbid);
*
* 'event' is a C string representing the event name.
* 'key' is a Redis object representing the key name.
* 'dbid' is the database ID where the key lives. */
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) {
sds chan;
robj *chanobj, *eventobj;
int len = -1;
char buf[24];

/* If any modules are interested in events, notify the module system now.
* This bypasses the notifications configuration, but the module engine
* will only call event subscribers if the event type matches the types
* they are interested in. */
moduleNotifyKeyspaceEvent(type, event, key, dbid);

/* If notifications for this class of events are off, return ASAP. */
if (!(server.notify_keyspace_events & type)) return;

eventobj = createStringObject(event,strlen(event));

/* __keyspace@<db>__:<key> <event> notifications. */
if (server.notify_keyspace_events & NOTIFY_KEYSPACE) {
chan = sdsnewlen("__keyspace@",11);
len = ll2string(buf,sizeof(buf),dbid);
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, key->ptr);
chanobj = createObject(OBJ_STRING, chan);
pubsubPublishMessage(chanobj, eventobj);
decrRefCount(chanobj);
}

/* __keyevent@<db>__:<event> <key> notifications. */
if (server.notify_keyspace_events & NOTIFY_KEYEVENT) {
chan = sdsnewlen("__keyevent@",11);
if (len == -1) len = ll2string(buf,sizeof(buf),dbid);
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, eventobj->ptr);
chanobj = createObject(OBJ_STRING, chan);
pubsubPublishMessage(chanobj, key);
decrRefCount(chanobj);
}
decrRefCount(eventobj);
}

参数type指定要发送通知的类型,event、keys和dbid分别是事件的名称、产生事件的键和键对应的数据库编号

当命令需要发送数据库通知时,该命令就会调用该函数,并传递相应的参数

RDB持久化

在执行SAVEBGSAVE命令创建一个新的RDB文件时,已过期的键不会保存到该文件中

在载入RDB文件时,也会在文件中遇到过期键

  • 如果以主服务器模式运行,过期键不会载入数据库
  • 如果以从服务器模式运行,过期键也会被载入数据库

生成的RDB文件的两个命令的区别在于:SAVE是阻塞的,而BGSAVE会创建一个子进程,由子进程去创建RDB文件

在执行BGSAVE时,SAVEBGSAVE会被服务器拒绝,这样做是为了避免产生竞争

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
int rdbSave(char *filename, rdbSaveInfo *rsi) {
char tmpfile[256];
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
FILE *fp;
rio rdb;
int error = 0;

snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(LL_WARNING,
"Failed opening the RDB file %s (in server root dir %s) "
"for saving: %s",
filename,
cwdp ? cwdp : "unknown",
strerror(errno));
return C_ERR;
}

rioInitWithFile(&rdb,fp);

if (server.rdb_save_incremental_fsync)
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);

if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
errno = error;
goto werr;
}

/* Make sure data will not remain on the OS's output buffers */
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;

/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
if (rename(tmpfile,filename) == -1) {
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(LL_WARNING,
"Error moving temp DB file %s on the final "
"destination %s (in server root dir %s): %s",
tmpfile,
filename,
cwdp ? cwdp : "unknown",
strerror(errno));
unlink(tmpfile);
return C_ERR;
}

serverLog(LL_NOTICE,"DB saved on disk");
server.dirty = 0;
server.lastsave = time(NULL);
server.lastbgsave_status = C_OK;
return C_OK;

werr:
serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
return C_ERR;
}

当服务器启动时,会自动检测RDB文件,并自动载入,载入过程也是阻塞的

由于BGSAVE可以在后台执行,所有通过设置一定的参数就可以让其在后台自动保存,如通过命令SAVE设置SAVE 900 1就表示数据库在900秒内,数据库至少修改1次时,就会执行BGSAVE

这些参数都保存在saveparam结构体数组中

1
2
3
4
5
6
7
8
struct redisServer
{
...
struct saveparam *saveparams; /* Save points array for RDB */
long long dirty; /* Changes to DB from the last save */
time_t lastsave; /* Unix time of last successful save */
...
};

dirty计数器记录距离上一次成功执行SAVE或BGSAVE命令后,服务器对数据库状态进行了多少次修改(包括写入、删除、更新等)

lastsave是一个UNIX时间戳,保存上一次执行SAVE或BGSAVE命令的时间

Redis的服务器周期性操作函数serverCron默认每隔100毫秒会执行一次,其中一项工作就是遍历saveparam数组,如果符合条件就会执行SAVE或BGSAVE命令

RDB文件结构

  • REDIS:保存REDIS五个字符
  • db_version:记录RDB文件的版本号
  • database:保存任意多个数据库,每条记录由SELECTDB、db_number、key_value_pairs三部分组成。SELECTDB表示接下来读入的是数据库号码,当读入该号码后,服务器就会切换数据库,将接下来的键值对载入正确的数据库中
  • EOF:RDB文件正文结束标志
  • check_num:根据前面四部分计算得到的校验和,会和载入数据计算得到的校验和进行比对,检查RDB的完整性

对于key_value_pairs,TYPE指明了对象类型或底层编码,服务器根据该值决定如何读入和解释value的值。对于具有过期时间的键值对,还有两个字段,前面的EXPIRETIME_MS常量表示接下来读入的是以毫秒为单位的过期时间,ms就表示UNIX时间戳,也就是过期时间

value字段根据保存值的对象不同会有不同的结构

AOF持久化

与RDB保存数据库中的键值对来记录数据库状态不同,AOF(Append Only File)通过保存Redis服务器所执行的写命令来记录数据库状态

当服务器以AOF持久化模式运行时,只有当过期键被惰性删除或定期删除之后,才会在AOF文件追加一条命名,显式记录该键被删除

由于AOF文件的更新频率比RDB高,所以在AOF持久化功能开启时,服务器会优先载入AOF文件

AOF持久化功能的实现分为命令追加、文件写入、文件同步三个步骤

当服务器执行完一个写命名后,会以协议格式将被执行的写命令追加到服务器状态的aof_buf缓冲区的末尾

服务器在一个事件循环中会调用flush44AppendOnlyFile函数,来将aof_buf缓冲区的内容写入和保存到AOF文件里面,该函数的行为有配置文件中的参数appendfsync决定,具体参见配置文件注解

对于AOF文件载入,服务器会创建一个不带网络连接的伪客户端,有其来执行AOF中的指令,使数据库恢复

为了解决AOF文件体积膨胀的问题,Redis提供了AOF文件重写功能,服务器可以创建一个新的AOF文件来代替现有的AOF文件,但是新文件中不会包含冗余命令,而且保存的数据库状态相同

虽然叫AOF文件重写,但是不对现有AOF文件做任何读取、分析操作,而是直接读取当前数据库状态来完成。该功能通过aof_rewrite()函数实现

但是因为这个函数进行大量的写入操作,调用该函数的线程被长时间阻塞,所以Redis服务器将AOF重写操作放入子进程

  • 子进程进行AOF重写,父进程(即服务器)可以继续处理命令请求
  • 使用子进程而不是线程,因为子进程带有父进程的数据副本,可以避免使用锁的情况下,保证数据的安全性???

使用子进程也会产生一个数据不一致的问题,在子进程重写AOF期间,父进程服务器接受的新的命令请求会修改数据库当前状态。为了解决这个问题,服务器设置了一个AOF重写缓冲区。在服务器创建子进程之后,这个缓冲区得到使用,也就是说在子进程重写AOF期间,服务器要将执行的命令写入AOF缓冲区和AOF重写缓冲区

当子进程完成AOF重写之后,会向父进程发送一个信号,服务器接到这个信号之后,会调用信号处理函数

  • 将AOF重写缓冲区所有内容写入新的AOF文件
  • 将新的AOF文件改名,原子地覆盖现有的AOF文件

父进程只有在执行信号处理函数时会造成阻塞,对命令请求的影响降到最低

事件

Redis是一个事件驱动程序,需处理以下两类事件

  • 文件事件:Redis服务器通过套接字与客户端进行连接,而文件事件就是服务器对套接字操作的抽象,服务器和客户端通信会产生相应的文件事件,而服务器通过监听并处理这些事件来完成一系列的网络通信操作
  • 时间事件:服务器的一些操作(如serverCron)需要在给定的时间点执行,时间事件就是服务器对这类定时操作的抽象

文件事件

Redis使用了一个称为“A simple event-driven programming library”的自制异步事件库(以下简称“AE”,源文件为ae.c

Redis基于Reactor模型开发了自己的网络事件处理器,称为file event handler

  • file event handler使用I/O多路复用监听多个套接字,并根据套接字目前执行的任务来为套接字关联不同的事件处理器
  • 当套接字准备好后,file event handler就会调用关联的事件处理器来处理事件

文件事件是对套接字操作的抽象。多个文件事件可能会并发出现(可想成是多个套接字准备好操作),但是I/O多路复用程序将套接字放入队列,以同步、每次一个套接字的方式向事件分派器发送套接字。只有上一个事件处理完毕,才会继续传送下一个套接字

Redis的I/O多路复用程序包装了iocp(windows),kqueue(freebsd)或是epoll(linux)等I/O多路复用函数,并实现了相同的API。在编译的时候会根据系统自动选择I/O多路复用程序的底层实现

1
2
#define AE_READABLE 1   /* Fire when descriptor is readable. */
#define AE_WRITABLE 2 /* Fire when descriptor is writable. */
  • 当套接字变为可读时(如客户端对套接字执行write、close操作等),就会产生相应的AE_READABLE事件
  • 当套接字变为可写时(如客户端对套接字执行read操作),就会产生AE_WRITABLE事件

API

1
2
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)

该API接受一个套接字、一个事件类型(aeEventLoop)、一个事件处理器(aeFileProc),将给定套接字的给定事件类型加入到I/O多路复用程序的监听范围,并将事件和事件处理器关联

1
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)

接受一个套接字描述符和一个监听事件类型,取消对给定套接字的给定事件的监听,并取消事件和事件处理器的关联

1
int aeGetFileEvents(aeEventLoop *eventLoop, int fd)

返回套接字正在被监听的事件类型

1
int aeWait(int fd, int mask, long long milliseconds)

在给定时间内阻塞并等待套接字的给定类型事件产生,事件产生或超时则返回

1
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)

在给定时间内,阻塞并等待所有设置为监听状态的套接字产生文件事件,只要产生一个或超时就返回

1
int aeProcessEvents(aeEventLoop *eventLoop, int flags)

该函数就是文件事件分配器,调用aeApiPoll等待事件产生,然后遍历所有已产生事件,并调用相应的事件处理器来处理事件

事件处理器

处理器源码在networking.c

连接应答处理器

为了对连接服务器的各个客户端进行应答,服务器需要为监听套接字关联连接应答处理器

1
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask)

实质就是对套接字accept函数的包装。当服务器初始化的时候,就会将该处理器和服务器监听套接字的AE_READABLE事件关联起来

命令请求处理器

1
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask)

实质就是对read函数的封装。当客户端连接成功时,就会将客户端的套接字的AE_READABLE事件和命令请求处理器关联

命令回复处理器

1
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask)

实质是对write函数的封装。当需要命令回复传回客户端时,服务器将客户端套接字的AE_WRITABLE事件和命令回复处理器关联。当客户端套接字准备好接受时,就会产生AE_WRITABLE事件,并引发处理器执行

时间事件

时间事件分为:定时事件,在指定时间执行;周期性事件,每隔指定时间执行

一个时间事件有:id(全局唯一标识符)、when(UNIX时间戳)和timeproc(时间事件处理器)组成。服务器将所有时间事件放在一个无序链表中,每当时间事件执行器运行时,遍历整个链表,找到符合条件的时间事件,并调用时间事件处理器

API

1
2
3
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)

将新的时间事件添加到服务器

1
static int processTimeEvents(aeEventLoop *eventLoop)

时间事件的执行器,遍历所有时间事件,并调用时间处理器来处理

Redis的serverCron函数就是一个时间事件,它定期对自身的资源和状态进行检查和调整

  • 更新服务器数据,如时间、内存
  • 清理过期键
  • 关闭无效客户端
  • AOF或RDB持久化
  • 集群模式时,定期同步和连接测试

事件调度

1
int aeProcessEvents(aeEventLoop *eventLoop, int flags)

这就是之前提到的API,也就是文件事件分派器,由它来对事件进行调度和分派

对于文件事件和时间事件的处理都是同步、有序和原子的。服务器不会中断事件处理,也不会对事件抢占。因此对于文件事件和时间事件处理器要自觉,尽量少占用阻塞时间,必要时还要主动退出

客户端

通过使用由I/O多路复用技术实现的文件事件处理器,Redis服务器使用单线程单进程的方式处理命令请求,并与多个客户端进行网络通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/* With multiplexing we need to take per-client state.
* Clients are taken in a liked list. */
typedef struct redisClient {
uint64_t id; /* Client incremental unique ID. */
int fd;
redisDb *db;
int dictid;
robj *name; /* As set by CLIENT SETNAME */
sds querybuf;
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size */
int argc;
robj **argv;
struct redisCommand *cmd, *lastcmd;
int reqtype;
int multibulklen; /* number of multi bulk arguments left to read */
long bulklen; /* length of bulk argument in multi bulk request */
list *reply;
unsigned long reply_bytes; /* Tot bytes of objects in reply list */
int sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
time_t ctime; /* Client creation time */
time_t lastinteraction; /* time of the last interaction, used for timeout */
time_t obuf_soft_limit_reached_time;
int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */
int authenticated; /* when requirepass is non-NULL */
int replstate; /* replication state if this is a slave */
int repldbfd; /* replication DB file descriptor */
off_t repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */
long long reploff; /* replication offset if this is our master */
long long repl_ack_off; /* replication ack offset, if this is a slave */
long long repl_ack_time;/* replication ack time, if this is a slave */
char replrunid[REDIS_RUN_ID_SIZE+1]; /* master run id if this is a master */
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
multiState mstate; /* MULTI/EXEC state */
blockingState bpop; /* blocking state */
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
sds peerid; /* Cached peer ID. */

/* Response buffer */
int bufpos;
char buf[REDIS_REPLY_CHUNK_BYTES];
} redisClient;

服务器