Redis5设计与源码分析 (第9章 命令处理生命周期)

Redis服务器是典型的事件驱动程序,而Redis将事件分为两大类:文件事件与时间事件。文件事件即socket的读写事件,时间事件用于处理一些需要周期性执行的定时任务,本章将对这两种事件作详细介绍。

9.1 基本知识

比如客户端信息的存储,Redis对外支持的命令集合,客户端与服务器socket读写事件的处理,Redis内部定时任务的执行等,本节将对这些知识进行简要介绍。

9.1.1 对象结构体robj

key只能是字符串,value可以是字符串、列表、集合、有序集合和散列表,这5种数据类型用结构体robj表示,我们称之为Redis对象。结构体robj的type字段表示对象类型,5种对象类型在server.h文件中定义:

/* The actual Redis Object */

#define OBJ_STRING 0 /* String object. */

#define OBJ_LIST 1 /* List object. */

#define OBJ_SET 2 /* Set object. */

#define OBJ_ZSET 3 /* Sorted set object. */

#define OBJ_HASH 4 /* Hash object. */

结构体robj的encoding字段表示当前对象底层存储采用的数据结构,即对象的编码,总共定义了11种encoding常量,见表9-1:

Redis5设计与源码分析 (第9章 命令处理生命周期)

表9-1 对象编码类型表

对象的整个生命周期中,编码不是一成不变的,比如集合对象。当集合中所有元素都可以用整数表示时,底层数据结构采用整数集合;当执行sadd命令向集合中添加元素时,Redis总会校验待添加元素是否可以解析为整数,如果解析失败,则会将集合存储结构转换为字典。如下所示:

if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {

subject->ptr = intsetAdd(subject->ptr,llval,&success);

} else {

/* /编码转换 ,Failed to get integer from object, convert to regular set. */

setTypeConvert(subject,OBJ_ENCODING_HT);

}

字典与跳跃表各有优势,因此Redis会同时采用字典与跳跃表存储有序集合。

有序集合结构定义如下:

typedef struct zset {

dict *dict;

zskiplist *zsl;

} zset;

 

结构体robj的定义:

typedef struct redisObject {

unsigned type:4;

unsigned encoding:4;

unsigned lru:LRU_BITS; /* 24, 缓存淘汰使用 */

int refcount; // 引用计数

void *ptr;

} robj;

结构体各字段含义:

1)ptr 是void*类型的指针,指向实际存储的某一种数据结构,但是当robj存储的数据可以用long类型表示时,数据直接存储在ptr字段。可以看出,为了创建一个字符串对象,必须分配两次内存,robj与sds存储空间;两次内存分配效率低下,且数据分离存储降低了计算机高速缓存的效率。因此提出OBJ_ENCODING_EMBSTR编码的字符串,当字符串内容比较短时,只分配一次内存robj与sds连续存储,以此提升内存分配效率与数据访问效率

OBJ_ENCODING_EMBSTR编码的字符串内存结构如图9-1所示。

Redis5设计与源码分析 (第9章 命令处理生命周期)

图9-1 EMBSTR编码字符串对象内存结构

 

2)refcount 存储当前对象的引用次数,用于实现对象的共享。共享对象时,refcount加1;删除对象时,refcount减1,当refcount值为0时释放对象空间。删除对象的代码如下:

void decrRefCount(robj *o) {

if (o->refcount == 1) {

switch(o->type) { // 根据对象类型,释放其指向数据结构空间

case OBJ_STRING: freeStringObject(o); break;

case OBJ_LIST: freeListObject(o); break;

case OBJ_SET: freeSetObject(o); break;

case OBJ_ZSET: freeZsetObject(o); break;

case OBJ_HASH: freeHashObject(o); break;

case OBJ_MODULE: freeModuleObject(o); break;

case OBJ_STREAM: freeStreamObject(o); break;

default: serverPanic("Unknown object type"); break;

}

zfree(o); //释放对象空间

} else {

if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");

if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--; //引用计数减1

}

}

 

3)lru字段 占24比特,用于实现缓存淘汰策略,可以在配置文件中使用maxmemory-policy配置已用内存达到最大内存限制时的缓存淘汰策略。lru根据用户配置的缓存淘汰策略存储不同数据,常用的策略就是LRU与LFU。

LRU的核心思想是,如果数据最近被访问过,那么将来被访问的几率也更高,此时lru字段存储的是对象访问时间;

LFU的核心思想是,如果数据过去被访问多次,那么将来被访问的频率也更高,此时lru字段存储的是上次访问时间与访问次数。例如使用GET命令访问数据时,会执行下面代码更新对象的lru字段:

if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {

updateLFU(val);

} else {

val->lru = LRU_CLOCK();

}

LRU_CLOCK函数用于获取当前时间,注意此时间不是实时获取的,Redis以1秒为周期执行系统调用获取精确时间,缓存在全局变量server.lruclock,LRU_CLOCK函数获取的只是该缓存时间。

updateLFU函数用于更新对象的上次访问时间与访问次数,函数实现如下:

void updateLFU(robj *val) {

unsigned long counter = LFUDecrAndReturn(val);

counter = LFULogIncr(counter);

val->lru = (LFUGetTimeInMinutes()<<8) | counter;

}

 

可以发现lru的低8比特存储的是对象的访问次数,高16比特存储的是对象的上次访问时间,以分钟为单位;需要特别注意的是函数LFUDecrAndReturn,其返回计数值counter,对象的访问次数在此值上累加。为什么不直接累加呢?因为假设每次只是简单的对访问次数累加,那么越老的数据一般情况下访问次数越大,即使该对象可能很长时间已经没有访问,相反新对象的访问次数通常会比较小,显然这是不公平的。因此访问次数应该有一个随时间衰减的过程,函数

LFUDecrAndReturn实现了此衰减功能。

9.1.2 客户端结构体client

Redis是典型的客户端服务器结构,客户端通过socket与服务端建立网络连接并发送命令请求,服务端处理命令请求并回复。Redis使用结构体client存储客户端连接的所有信息,包括但不限于客户端的名称、客户端连接的套接字描述符、客户端当前选择的数据库ID、客户端的输入缓冲区与输出缓冲区等。结构体client字段较多,此处只介绍命令处理主流程所需的关键字段。

typedef struct client {

uint64_t id; //客户端唯一ID,通过全局变量server.next_client_id实现

int fd; //客户端socket的文件描述符

redisDb *db; //客户端使用select命令选择的数据库对象

robj *name; //客户端名称,可以使用命令CLIENT SETNAME设置

time_t lastinteraction //客户端上次与服务器交互的时间,以此实现客户端的超时处理

sds querybuf; //输入缓冲区,recv函数接收的客户端命令请求会暂时缓存在此缓冲区

int argc; //输入缓冲区的命令请求是按照Redis协议格式编码字符串,需要解析出命令请求的所有参数,参数个数存储在argc字段参数内容被解析为robj对象,存储在argv数组

robj **argv;

struct redisCommand *cmd; //待执行的客户端命令;解析命令请求后,会根据命令名

称查找该命令对应的命令对象,存储在客户端cmd字段,可以看到其类型为struct redisCommand

list *reply; //输出链表,存储待返回给客户端的命令回复数据。

unsigned long long reply_bytes; //表示输出链表中所有节点的存储空间总和

size_t sentlen; //表示已返回给客户端的字节数;

char buf[PROTO_REPLY_CHUNK_BYTES]; //输出缓冲区,存储待返回给客户端的命令回复数据,bufpos表示输出缓冲区中数据的最大字节位置,显然sentlen~bufpos区间的数据都是需要返回给客户端的。可以看到reply和buf都用于缓存待返回给客户端的命令回复数据,为什么同时需要reply和buf的存在呢?其实二者只是用于返回不同的数据类型而已。

int bufpos;

} client;

 

redisDb

redisDb结构体定义如下:

typedef struct redisDb {

dict *dict; /* 存储数据库所有键值对*/

dict *expires; /*存储键的过期时间 */

dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/

dict *ready_keys; /* Blocked keys that received a PUSH */

dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */

int id; /* 数据库序号,默认情况下Redis有16个数据库,id序号为0~15 */

long long avg_ttl; /* 存储数据库对象的平均TTL,用于统计*/

unsigned long expires_cursor; /* Cursor of the active expire cycle. */

list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */

} redisDb;

 

使用命令BLPOP阻塞获取列表元素时,如果链表为空,会阻塞客户端,同时将此列表键记录在blocking_keys;当使用命令PUSH向列表添加元素时,会从字典blocking_keys中查找该列表键,如果找到说明有客户端正阻塞等待获取此列表键,于是将此列表键记录到字典ready_keys,以便后续响应正在阻塞的客户端。

Redis事务

Redis支持事务,命令multi用于开启事务,命令exec用于执行事务;但是开启事务到执行事务期间,如何保证关心的数据不会被修改呢?Redis采用乐观锁实现。开启事务的同时可以使用watch key命令监控关心的数据键,而watched_keys字典存储的就是被watch命令监控的所有数据键,其中key-value分别为数据键与客户端对象。当Redis服务器接收到写命令时,会从字典watched_keys中查找该数据键,如果找到说明有客户端正在监控此数据键,于是标记客户端对象为dirty;待Redis服务器收到客户端exec命令时,如果客户端带有dirty标记,则会拒绝执行事务

 

reply.链表

链表节点存储的值类型为clientReplyBlock,定义如下:

可以看到链表节点本质上就是一个缓冲区(buffer);

typedef struct clientReplyBlock {

size_t size, used; //size表示缓冲区空间总大小,used表示缓冲区已使用空间大小

char buf[];

} clientReplyBlock;

 

 

9.1.3 服务端结构体redisServer

结构体redisServer存储Redis服务器的所有信息,包括但不限于数据库、配置参数、命令表、监听端口与地址、客户端列表、若干统计信息、RDB与AOF持久化相关信息、主从复制相关信息、集群相关信息等。结构体redisServer的字段非常多,这里只对部分字段做简要说明,以便读者对服务端有个粗略了解,至于其他字段在讲解各知识点时会进行说明。

结构体redisServer定义如下:

struct redisServer {

char *configfile; //配置文件绝对路径

int dbnum; //数据库的数目,可通过参数databases配置,默认16

redisDb *db; //数据库数组,数组的每个元素都是redisDb类型

dict *commands; //命令字典,Redis支持的所有命令都存储在这个字典中,key为命令名称,vaue为struct redisCommand对象。

aeEventLoop *el; //Redis是典型的事件驱动程序,el代表Redis的事件循环,类型为aeEventLoop

int port; //服务器监听端口号,可通过参数port配置,默认端口号6379。

char *bindaddr[CONFIG_BINDADDR_MAX]; //绑定的所有IP地址,可以通过参数bind配置多个,例如bind 192.168.1.10010.0.0.1,;CONFIG_BINDADDR_MAX常量为16,即最多绑定16个IP地址;Redis默认会绑定到当前机器所有可用的Ip地址;

int bindaddr_count; //bindaddr_count为用户配置的IP地址数目

int ipfd[CONFIG_BINDADDR_MAX]; //针对bindaddr字段的所有IP地址创建的socket文件描述符,

int ipfd_count; //ipfd_count为创建的socket文件描述符数目

list *clients; //当前连接到Redis服务器的所有客户端。

int maxidletime; //最大空闲时间,可通过参数timeout配置,结合client对象的lastinteraction字段,当客户端没有与服务器交互的时间超过maxidletime时,会认为客户端超时并释放该客户端连接;

}

 

9.1.4 命令结构体redisCommand

Redis支持的所有命令初始都存储在全局变量

redisCommandTable,类型为redisCommand,定义及初始化如下:

struct redisCommand redisCommandTable[] = {

     {"module",moduleCommand,-2,

     "admin no-script",

     0,NULL,0,0,0,0,0,0},

 

     {"get",getCommand,2,

     "read-only fast @string",

     0,NULL,1,1,1,0,0,0},

...

}

 

结构体redisCommand相对简单,主要定义了命令的名称、命令处理函数以及命令标志等:

struct redisCommand {

char *name; // 命令名称

redisCommandProc *proc; //命令处理函数

int arity; //命令参数数目,用于校验命令请求格式是否正确;当arity小于0时,表示命令参数数目大于等于arity;当arity大于0时,表示命令参数数目必须为arity;注意命令请求中,命令的名称本身也是一个参数,如get命令的参数数目为2,命令请求格式为get key。

char *sflags; /* 命令标志,例如标识命令时读命令还是写命令,详情参见表9-2;注意到sflags的类型为字符串,此处只是为了良好的可读性。 */

uint64_t flags; /* 命令的二进制标志,服务器启动时解析sflags字段生成*/

/* 使用函数确定命令行中的键自变量。 用于Redis群集重定向。*/

redisGetKeysProc *getkeys_proc;

/* 调用此命令时应在后台加载哪些键? */

int firstkey; /* The first argument that's a key (0 = no keys) */

int lastkey; /* The last argument that's a key */

int keystep; /* 第一个和最后一个键之间的步骤*/

long long microseconds, calls;

// calls :从服务器启动至今命令执行的次数,用于统计

// microseconds:从服务器启动至今命令总的执行时间,microseconds/calls即可计算出该 命令的平均处理时间,用于统计

int id; //命令id

};

表9-2 命令标志类型

Redis5设计与源码分析 (第9章 命令处理生命周期)

当服务器接收到一条命令请求时,需要从命令表中查找命令,而redisCommandTable命令表是一个数组,意味着查询命令的时间复杂度为O(N),效率低下。

因此Redis在服务器初始化时,会将redisCommandTable转换为一个字典存储在redisServer对象的commands字段,key为命令名称,value为命令redisCommand对象。populateCommandTable函数实现了命令表从数组到字典的转化,同时解析sflags生成flags:

/* Populates the Redis Command Table starting from the hard coded list

* we have on top of redis.c file. */

void populateCommandTable(void) {

int j;

int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);

for (j = 0; j < numcommands; j++) {

struct redisCommand *c = redisCommandTable+j;

int retval1, retval2;

/* 将命令字符串标志描述转换为实际的标志集.

修改标志字符串描述'strflags'并将它们设置为命令'c'。

如果所有标志均有效,则返回C_OK,否则返回*C_ERR(但已在命令中设置了已识别的标志

*/

if (populateCommandTableParseFlags(c,c->sflags) == C_ERR)

serverPanic("Unsupported command flag");

c->id = ACLGetCommandID(c->name); /* Assign the ID used for ACL. */

retval1 = dictAdd(server.commands, sdsnew(c->name), c);

/* 使用redis.conf中的重命名命令语句填充一个不会受到影响的附加词典。 */

retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);

serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);

}

}

 

对于经常使用的命令,Redis甚至会在服务器初始化的时候将命令缓存在redisServer对象,这样使用的时候就不需要每次都从commands字典中查找了:

struct redisServer {

/* Fast pointers to often looked up command */

struct redisCommand *delCommand, *multiCommand, *lpushCommand,

*lpopCommand, *rpopCommand, *zpopminCommand,

*zpopmaxCommand, *sremCommand, *execCommand,

*expireCommand, *pexpireCommand, *xclaimCommand,

*xgroupCommand, *rpoplpushCommand;...

}

9.1.5 事件处理

Redis服务器是典型的事件驱动程序,而事件又分为文件事件(socket的可读可写事件)与时间事件(定时任务)两大类。无论是文件事件还是时间事件都封装在结构体aeEventLoop中:

/* State of an event based program */

typedef struct aeEventLoop {

int maxfd; /* 当前注册的最高文件描述符 */

int setsize; /* 跟踪的文件描述符的最大数量 */

long long timeEventNextId;

time_t lastTime; /* Used to detect system clock skew */

aeFileEvent *events; /* 注册的文件事件数组 */

aeFiredEvent *fired; /* 存储被触发的文件事件 */

aeTimeEvent *timeEventHead; // 时间事件链表头节点

int stop; // 标识事件循环是否结束

void *apidata; /*Redis底层可以使用4种I/O多路复用模型(kqueue、epoll等),apidata是对这4种模型的进一步封装。 */

aeBeforeSleepProc *beforesleep; // Redis服务器需要阻塞等待文件事件的发生,进程阻塞之前会调用beforesleep函数

aeBeforeSleepProc *aftersleep; // 进程因为某种原因被唤醒之后会调用aftersleep函数

int flags;

} aeEventLoop;

事件驱动程序通常存在while/for循环,循环等待事件发生并处理,Redis也不例外,其事件循环如下:

while (!eventLoop->stop) {

// 事件处理主函数 ,第2个参数是一个标志位

aeProcessEvents(eventLoop, AE_ALL_EVENTS| // 函数需要处理文件事件与时间事件

AE_CALL_BEFORE_SLEEP|

AE_CALL_AFTER_SLEEP); //阻塞等待文件事件之后需要执行aftersleep函数

}

1.文件事件

Redis客户端通过TCP socket与服务端交互,文件事件指的就是socket的可读可写事件。socket读写操作有阻塞与非阻塞之分。采用阻塞模式时,一个进程只能处理一条网络连接的读写事件,为了同时处理多条网络连接,通常会采用多线程或者多进程,效率低下;非阻塞模式下,可以使用目前比较成熟的I/O多路复用模型,如select/epoll/kqueue等,视不同操作系统而定。

epoll简要介绍。

epoll是Linux内核为处理大量并发网络连接而提出的解决方案,能显著提升系统CPU利用率。epoll使用非常简单,

总共只有3个API:

epoll_create 函数创建一个epoll专用的文件描述符,用于后续epoll相关API调用;

epoll_ctl 函数向epoll注册、修改或删除需要监控的事件;

epoll_wait 函数会阻塞进程,直到监控的若干网络连接有事件发生。

 

① int epoll_create(int size)

输入参数 size 通知内核程序期望注册的网络连接数目,内核以此判断初始分配空间大小;注意在Linux 2.6.8版本以后,内核动态分配空间,此参数会被忽略。返回参数为epoll专用的文件描述符,不再使用时应该及时关闭此文件描述符。

② int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

函数执行成功时返回0,否则返回-1,错误码设置在变量errno,输入参数含义如下。

·epfd:函数epoll_create返回的epoll文件描述符。

·op:需要进行的操作,EPOLL_CTL_ADD表示注册事件,EPOLL_CTL_MOD表示修改网络 连接事件,EPOLL_CTL_DEL表示删除事件。

·fd:网络连接的socket文件描述符。

·event:需要监控的事件,结构体epoll_event定义如下:

struct epoll_event {

__uint32_t events; //需要监控的事件类型,比较常用的是EPOLLIN文件描述符可读事件,EPOLLOUT文件描述符可写事件;

epoll_data_t data; //保存与文件描述符关联的数据。

};

typedef union epoll_data {

void *ptr;

int fd;

__uint32_t u32;

__uint64_t u64;

} epoll_data_t;

 

③ int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout)

函数执行成功时返回0,否则返回-1,错误码设置在变量errno;输入参数含义如下:

·epfd:函数epoll_create返回的epoll文件描述符;

·epoll_event:作为输出参数使用,用于回传已触发的事件数组;

·maxevents:每次能处理的最大事件数目;

·timeout:epoll_wait 函数阻塞超时时间,如果超过时间还没有事件发生,函数不再阻塞直接返回;当timeout等于0时函数立即返回,timeout等于-1时函数会一直阻塞直到有事件发生。

 

Redis/O多路复用

Redis并没有直接使用epoll提供的API,而是同时支持4种I/O多路复用模型,并将这些模型的API进一步统一封装,由文件ae_evport.c、ae_epoll.c、ae_kqueue.c和ae_select.c实现。Redis在编译阶段,会检查操作系统支持的I/O多路复用模型,并按照一定规则决定使用哪种模型。

以epoll为例,

aeApiCreate函数是对epoll_create的封装;

aeApiAddEvent函数用于添加事件,是对epoll_ctl的封装;

aeApiDelEvent函数用于删除事件,是对epoll_ctl的封装;

aeApiPoll是对epoll_wait的封装。

 

static int aeApiCreate(aeEventLoop *eventLoop) ;

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)

static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask)

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)

 

4个函数的输入参数含义如下。

·eventLoop:事件循环,与文件事件相关的最主要字段有3个,apidata指向I/O多路复用模型对象,注意4种I/O多路复用模型对象的类型不同,因此此字段是void*类型;events存储需要监控的事件数组,以socket文件描述符作为数组索引存取元素;fired存储已触发的事件数组。

以epoll模型为例,apidata字段指向的I/O多路复用模型对象定义如下:

typedef struct aeApiState {

int epfd; // 函数epoll_create返回的epoll文件描述符

struct epoll_event *events; //存储epoll_wait函数返回时已触发的事件数组

} aeApiState;

 

·fd:操作的socket文件描述符;

·mask 或delmask :添加或者删除的事件类型,AE_NONE表示没有任何事件;AE_READABLE表示可读事件;AE_WRITABLE表示可写事件;

·tvp:阻塞等待文件事件的超时时间。

 

这里只对等待事件函数aeApiPoll实现作简要介绍:

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {

aeApiState *state = eventLoop->apidata;

int retval, numevents = 0;

// 阻塞等待事件的发生

retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,

tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);

if (retval > 0) {

int j;

numevents = retval;

//Redis再次遍历所有已触发事件

for (j = 0; j < numevents; j++) {

int mask = 0;

struct epoll_event *e = state->events+j;

// 转换事件类型为Redis定义的

if (e->events & EPOLLIN) mask |= AE_READABLE;

if (e->events & EPOLLOUT) mask |= AE_WRITABLE;

if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;

if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;

//记录已发生事件到fired数组

eventLoop->fired[j].fd = e->data.fd;

eventLoop->fired[j].mask = mask;

}

}

return numevents;

}

说明:

函数首先需要通过eventLoop->apidata字段获取epoll模型对应的aeApiState结构体对象,才能调用epoll_wait函数等待事件的发生;epoll_wait函数将已触发的事件存储到aeApiState对象的events字段,Redis再次遍历所有已触发事件,将其封装在eventLoop->fired数组,数组元素类型为结构体aeFiredEvent,只有两个字段,fd表示发生事件的socket文件描述符,mask表示发生的事件类型,如AE_READABLE可读事件和AE_WRITABLE可写事件。

 

文件事件

结构体aeEventLoop有一个关键字段events,类型为aeFileEvent数组,存储所有需要监控的文件事件。文件事件结构体定义如下:

typedef struct aeFileEvent {

int mask; // 存储监控的文件事件类型,如AE_READABLE可读事件和AE_WRITABLE可写事件

aeFileProc *rfileProc; // 为函数指针,指向读事件处理函数

aeFileProc *wfileProc; // 同样为函数指针,指向写事件处理函数

void *clientData; // 指向对应的客户端对象

} aeFileEvent;

 

调用aeApiAddEvent函数添加事件之前,首先需要调用aeCreateFileEvent函数创建对应的文件事件,并存储在aeEventLoop结构体的events字段,aeCreateFileEvent函数简单实现如下:

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,

aeFileProc *proc, void *clientData)

{

aeFileEvent *fe = &eventLoop->events[fd];

if (aeApiAddEvent(eventLoop, fd, mask) == -1)

return AE_ERR;

fe->mask |= mask;

if (mask & AE_READABLE) fe->rfileProc = proc;

if (mask & AE_WRITABLE) fe->wfileProc = proc;

fe->clientData = clientData;

if (fd > eventLoop->maxfd)

eventLoop->maxfd = fd;

return AE_OK;

}

 

Redis服务器启动时需要创建socket并监听等待客户端连接;客户端与服务器建立socket连接之后,服务器会等待客户端的命令请求;服务器处理完客户端的命令请求之后,命令回复会暂时缓存在client结构体的buf缓冲区待客户端文件描述符的可写事件发生时,才会真正往客户端发送命令回复

这些都需要创建对应的文件事件:

aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,

acceptTcpHandler,NULL);

aeCreateFileEvent(server.el,fd,AE_READABLE,

readQueryFromClient, c);

aeCreateFileEvent(server.el, c->fd, ae_flags,

sendReplyToClient, c);

接收客户端连接的处理函数为acceptTcpHandler,此时还没有创建对应的客户端对象,因此函数aeCreateFileEvent第4个参数为NULL;

接收客户端命令请求的处理函数为readQueryFromClient

向客户端发送命令回复的处理函数为sendReplyToClient

 

最后思考一个问题,aeApiPoll函数的第2个参数是时间结构体timeval,存储调用epoll_wait时传入的超时时间,那么这个时间是怎么计算出来的呢?我们之前提过,Redis除了要处理各种文件事件外,还需要处理很多定时任务(时间事件),那么当Redis由于执行epoll_wait而阻塞时,恰巧定时任务到期而需要处理怎么办?要回答这个问题需要分析Redis事件循环的执行函数aeProcessEvents,函数在调用aeApiPoll之前会遍历Redis的时间事件链表,查找最早会发生的时间事件,以此作为aeApiPoll需要传入的超时时间。如下所示:

 

int aeProcessEvents(aeEventLoop *eventLoop, int flags)

{

shortest = aeSearchNearestTimer(eventLoop);

long long ms =

(shortest->when_sec - now_sec)*1000 +

shortest->when_ms - now_ms;

…………

/* 阻塞等待文件事件发生. */

numevents = aeApiPoll(eventLoop, tvp);

 

for (j = 0; j < numevents; j++) {

aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

//处理文件事件,即根据类型执行rfileProc或wfileProc

/* 处理时间事件*/

if (flags & AE_TIME_EVENTS)

processed += processTimeEvents(eventLoop);

}

该方法注释 : 处理每个待处理的时间事件,然后处理每个待处理的文件事件(可以由刚刚处理的时间事件回调注册)。 如果没有特殊标志,该功能将一直休眠,直到引发某些文件事件或下次发生事件(如果有)时为止。

 

2.时间事件

前面介绍了Redis文件事件,已经知道事件循环执行函数aeProcessEvents的主要逻辑:

①查找最早会发生的时间事件,计算超时时间;

②阻塞等待文件事件的产生;

③处理文件事件;

④处理时间事件。时间事件的执行函数为processTimeEvents

 

Redis服务器内部有很多定时任务需要执行,比如定时清除超时客户端连接,定时删除过期键等,定时任务被封装为时间事件aeTimeEvent对象,多个时间事件形成链表,存储在aeEventLoop结构

体的timeEventHead字段,它指向链表首节点。时间事件aeTimeEvent定义如下:

/* Time event structure */

typedef struct aeTimeEvent {

long long id; /* 时间事件唯一ID,通过字段eventLoop->timeEventNextId实现; */

long when_sec; /* 时间事件触发的秒数 */

long when_ms; /* 时间事件触发的毫秒数 */

aeTimeProc *timeProc; // 函数指针,指向时间事件处理函数

aeEventFinalizerProc *finalizerProc; // 函数指针,删除时间事件节点之前会调用此函数;

void *clientData; // 指向对应的客户端对象

struct aeTimeEvent *prev;

struct aeTimeEvent *next; // 指向下一个时间事件节点

int refcount; /* refcount以防止在递归时间事件调用中释放计时器事件 */

} aeTimeEvent;

 

时间事件执行函数processTimeEvents

处理逻辑比较简单,只是遍历时间事件链表,判断当前时间事件是否已经到期,如果到期则执

行时间事件处理函数timeProc:

static int processTimeEvents(aeEventLoop *eventLoop) {

while(te) {

aeGetTime(&now_sec, &now_ms);

if (now_sec > te->when_sec ||

(now_sec == te->when_sec && now_ms >= te->when_ms))

{

// 处理时间事件

retval = te->timeProc(eventLoop, id, te->clientData);

//重新设置时间事件到期时间

if (retval != AE_NOMORE) {

aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);

}

}

te = te->next;

}

return processed;

}

注意时间事件处理函数timeProc返回值retval,其表示此时间事件下次应该被触发的时间,单位为毫秒,且是一个相对时间,即从当前时间算起,retval毫秒后此时间事件会被触发。其实Redis只有一个时间事件,看到这里读者可能会有疑惑,服务器内部不是有很多定时任务吗, 为什么只有一个时间事件呢?回答此问题之前我们需要先分析这个唯一的时间事件。

Redis创建时间事件节点的函数为aeCreateTimeEvent,内部实现非常简单,只是创建时间事件并添加到时间事件链表。aeCreateTimeEvent函数定义如下:

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,

aeTimeProc *proc, void *clientData,

aeEventFinalizerProc *finalizerProc);

·eventLoop:输入参数指向事件循环结构体;

·milliseconds:表示此时间事件触发时间,单位毫秒,注意这是一个相对时间,即从当前时间算起,milliseconds毫秒后此时间事件会被触发;

·proc:指向时间事件的处理函数;

·clientData:指向对应的结构体对象;

·finalizerProc:同样是函数指针,删除时间事件节点之前会调用此函数。

 

读者可以在代码目录全局搜索aeCreateTimeEvent,会发现确实只创建了一个时间事件:

aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);

该时间事件在1毫秒后会被触发,处理函数为serverCron,参数clientData与finalizerProc都为NULL。而函数serverCron实现了Redis服务器所有定时任务的周期执行。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {

....

run_with_period(100) {

... //100毫秒周期执行

}

run_with_period(5000) {

... //5000毫秒周期执行

}

/*清除超时客户端连接 . */

clientsCron();

/* 处理数据库 */

databasesCron();

...

server.cronloops++; //变量server.cronloops用于记录serverCron函数的执行次数

return 1000/server.hz; //变量server.hz表示serverCron函数的执行频率,用户可配置,最小为1最大为500,默认为10

}

run_with_period宏定义实现了定时任务按照指定时间周期(_ms_)执行,此时会被替换为一个if条件判断,条件为真才会执行定时任务,定义如下

#define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))

serverCron函数会无条件执行某些定时任务,比如清除超时客户端连接,以及处理数据库(清除数据库过期键等)。需要特别注意一点,serverCron函数的执行时间不能过长,否则会导致服务器不能及时响应客户端的命令请求。

下面以过期键删除为例,分析Redis是如何保serverCron函数的执行时间。过期键删除由函数activeExpireCycle实现,由函数databasesCron调用,其函数是实现如下:

#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25

void activeExpireCycle(int type) {

timelimit = config_cycle_slow_time_perc*1000000/server.hz/100;

timelimit_exit = 0;

 

for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {

do {

/* 查找过期键并删除. */

if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */

elapsed = ustime()-start;

if (elapsed > timelimit) {

timelimit_exit = 1;

server.stat_expired_time_cap_reached_count++;

break;

}

}

} while (sampled == 0 ||

(expired*100/sampled) > config_cycle_acceptable_stale);

}

}

最多遍历"dbs_per_call"个数据库,并记录每个数据库删除的过期键数目;当删除过期键数目大于门限时,认为此数据库过期键较多,需要再次处理。考虑到极端情况,当数据库键数目非常多且基本都过期时,do-while循环会一直执行下去。因此我们添加timelimit时间限制,每执行16次do-while循环,检测函数activeExpireCycle执行时间是否超过timelimit,如果超过则强制结束循环。

初看timelimit的计算方式可能会比较疑惑,其计算结果使得函数activeExpireCycle的总执行时间占CPU时间的25%,即每秒函数activeExpireCycle的总执行时间为1000000×25/100单位微秒。仍然假设server.hz取默认值10,即每秒函数activeExpireCycle执行10次,那么每次函数activeExpireCycle的执行时间为1000000×25/100/10,单位微秒。

 

9.2 server启动过程

学习Redis服务器的启动过程,主要分为server初始化,监听端口以及等待命令3节。

9.2.1 server初始化

服务器初始化主流程(见图9-2)可以简要分为7个步骤:

Redis5设计与源码分析 (第9章 命令处理生命周期)

①初始化配置,包括用户可配置的参数,以及命令表的初始化;

由函数initServerConfig实现,具体操作就是给配置参数赋初始值:

void initServerConfig(void) {

.......

// serverCron函数执行频率,默认10

server.hz = CONFIG_DEFAULT_HZ;

//监听端口,默认6379

server.port = CONFIG_DEFAULT_SERVER_PORT;

//最大客户端数目,默认10 000

server.maxclients = CONFIG_DEFAULT_MAX_CLIENTS;

//客户端超时时间,默认0,即永不超时

server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT;

//数据库数目,默认16

server.dbnum = CONFIG_DEFAULT_DBNUM;

//初始化命令表

populateCommandTable();

.....

}

 

②加载并解析配置文件;

入口函数为loadServerConfig,

void loadServerConfig(char *filename, char *options) ;

filename表示配置文件全路径名称,options表示命令行输入的配置参数,

例如我们通常以如下命令启动Redis服务器:

/home/user/redis/redis-server /home/user/redis/redis.conf -p 4000

加载完成后会调用loadServerConfigFromString函数解析配置,输入参数config即配置字符串,实现如下:

void loadServerConfigFromString(char *config) {

// 分割配置字符串多行,totlines记录行数 , 因为 配置时一行一行的

lines = sdssplitlen(config,strlen(config)," ",1,&totlines);

for (i = 0; i < totlines; i++) {

/* 跳过注释和空行*/

if (lines[i][0] == '#' || lines[i][0] == '