当前位置: 首页 > news >正文

网站防护找谁做百度收录提交网址

网站防护找谁做,百度收录提交网址,情感视频素材网站,稳稳在哪个网站做的消防直播目录 AOF日志 1. 持久化——命令写入到AOF文件 写到用户缓冲区 AOF的触发入口函数——propagate 具体的实现逻辑——feedAppendOnlyFile 从用户缓冲区写入到AOF文件(磁盘) 函数write、fsync、fdatasync Redis的线程池 AOF文件的同步策略 触发的入口函数——…

目录

AOF日志

1. 持久化——命令写入到AOF文件

写到用户缓冲区

AOF的触发入口函数——propagate

 具体的实现逻辑——feedAppendOnlyFile

从用户缓冲区写入到AOF文件(磁盘)

函数write、fsync、fdatasync

Redis的线程池

AOF文件的同步策略

触发的入口函数——flushAppendOnlyFile

2. AOF重写 

AOF 重写的2个触发时机

用户发送 bgrewriteaof 命令

在定时函数serverCron中触发

父子进程使用pipe进行通信 

两个缓冲区——重写缓冲 和 差异缓冲

重写缓冲

差异缓冲

rewriteAppendOnlyFileBackground的实现

执行重写过程的函数——rewriteAppendOnlyFile

父进程监听子进程结束, AOF 重写收尾

在定时函数serverCron中监听

主进程对AOF重写收尾——backgroundRewriteDoneHandler

3. Redis重启,AOF 文件加载


Redis是把数据储存在内存的键值数据库,而服务器一旦宕机,那内存中的数据将全部丢失。像MySQL那样,是有宕机后数据恢复机制的。那Redis也是有的,其有两种方式:AOF和RDB。该文章讲解AOF

AOF日志

MySQL是使用redo log(重做日志)来进行宕机恢复的。其是使用了写前日志(Write Ahead Log,WAL),即是在实际写数据前,先把修改的数据写到日志文件中,方便出故障时候进行恢复。

而AOF正好相反,是写后日志即是先执行命令把数据写到内存,之后再把该操作记录到日志中。这是个文本日志,不是二进制文件。

那该日志主要有3个操作:

  • AOF持久化(同步):客户端向Redis服务器发送命令,这些命令会被存储到AOF缓冲区中,并随后会持久化到磁盘文件中
  • AOF重写:随着写入的内容越来越多,就会占用大量的磁盘空间,并且Redis重启时候需要按照顺序执行AOF中的命令,这样时间就比较长,所以Redis 会定期重写 AOF 日志,以达到文件瘦身的效果和缩短重启恢复所需的时间。
  • 重启数据恢复:Redis重启后,通过AOF来进行数据恢复

1. 持久化——命令写入到AOF文件

写到用户缓冲区

首先,写入到AOF的命令是先存储在一个AOF缓冲区。

struct redisServer {.........sds aof_buf;      /* AOF buffer, written before entering the event loop */
};

客户端发送的命令转为RESP协议格式的字符串,然后追加到已有的字符串后面,这些都是存储在aof_buf中。

AOF的触发入口函数——propagate

单线程情况下,其函数被调用的流程:readQueryFromClient——>processInputBuffer——>processCommandAndResetClient——>processCommand——> call(client *c, int flags) ——>propagate

void call(client *c, int flags) {/* Call the command. */c->cmd->proc(c);........................// 入参的 flags 设置了 CMD_CALL_PROPAGATE 标识, 表示当前的命令需要传播// 同时对应的客户端内部的标识不是 CLIENT_PREVENT_PROP (客户端的命令阻止传播)if (flags & CMD_CALL_PROPAGATE && (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP){int propagate_flags = PROPAGATE_NONE;if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);// 当前的客户端设置了需要强制同步传播,或者设置了 需要强制 AOF 传播if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;// 与客户端c的flags对比,若是符合条件,取消 命令传播标识的repl或者aofif (c->flags & CLIENT_PREVENT_REPL_PROP || !(flags & CMD_CALL_PROPAGATE_REPL))propagate_flags &= ~PROPAGATE_REPL;if (c->flags & CLIENT_PREVENT_AOF_PROP || !(flags & CMD_CALL_PROPAGATE_AOF))propagate_flags &= ~PROPAGATE_AOF;//  命令传播标识 不为 none, 且当前的命令不是模块命令if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))// 处理aof 和 复制给副本            propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);}..................................
}
// 将命令写到aof 文件,并将命令发送给副本
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,int flags)
{if (!server.replication_allowed)return;// AOF 开启了, 同时命令传播标识为 需要 AOF 传播if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)feedAppendOnlyFile(cmd,dbid,argv,argc);     // 将当前的命令保存到 AOF 缓冲区..................  
}

 具体的实现逻辑——feedAppendOnlyFile

该函数就是把命令写入到aof缓冲区。

  1. 创建一个SDS对象buf,用户把命令写入到该对象。判断该命令使用的数据库号是否是用户选择的数据库号,若不是就需要在aof文件中添加选择数据库。
  2. 把命令写入到buf。
    1. 对于 EXPIREEXPIREAT 和 PEXPIRE 将其转换为 PEXPIREAT 特殊处理。
    2. 对于带 EXPX 参数的 SET 命令特殊处理,主要涉及过期时间的处理;
    3. 对于其它命令,调用 catAppendOnlyGenericCommand 按照 RESP 协议组装命令,并将其暂存至 buf;
  3. 如果启用 AOF 日志,则将 buf 中暂存的命令追加到 AOF缓冲区server.aof_buf。
  4. 如果存在正在重写 AOF 的子进程,则将命令追加到 AOF 重写缓冲区server.aof_rewrite_buf_blocks
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {sds buf = sdsempty();//该命令写入的数据库和用户选择的数据库不一致的话,需要在aof文件添加一段选择数据库的记录if (dictid != server.aof_selected_db) {char seldb[64];snprintf(seldb,sizeof(seldb),"%d",dictid);// 拼接出一个 select 数据库号 的语句, 这个语句是遵守 RESP 协议buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",(unsigned long)strlen(seldb),seldb);server.aof_selected_db = dictid;}//这三个命令, 在 AOF 保存的时候, 都会转为 expireat key 具体的过期时间 (单位毫秒) 的格式存入到 AOF 文件中if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||cmd->proc == expireatCommand) {// 转为过期对应的文本, 同时追加到 buf 中buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);} else if (cmd->proc == setCommand && argc > 3) {//带 EX、PX 参数的 SET 命令,特殊处理, set key value ex seconds, set key value px millisecondsrobj *pxarg = NULL;if (!strcasecmp(argv[3]->ptr, "px")) {    //过期时间是毫秒的pxarg = argv[4];}if (pxarg) {    //毫秒的robj *millisecond = getDecodedObject(pxarg);long long when = strtoll(millisecond->ptr,NULL,10);when += mstime();decrRefCount(millisecond);robj *newargs[5];newargs[0] = argv[0];newargs[1] = argv[1];newargs[2] = argv[2];newargs[3] = shared.pxat;newargs[4] = createStringObjectFromLongLong(when);// 往 buf 中追加 set 命令buf = catAppendOnlyGenericCommand(buf,5,newargs);// 创建的对象手动修改引用计数, 便于内存回收decrRefCount(newargs[4]);} else {    //秒过期的buf = catAppendOnlyGenericCommand(buf,argc,argv);}} else {// 其他的命令直接转为 RESP 协议的字符串进行追加buf = catAppendOnlyGenericCommand(buf,argc,argv);}//将组装好的命令追加到 aof_bufif (server.aof_state == AOF_ON)server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));// CHILD_TYPE_AOF表明后台正在进行重写,那么将命令再追加一份到重写缓冲区中,以便我们记录重写时 AOF 文件和当前数据库的差异if (server.child_type == CHILD_TYPE_AOF)aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));sdsfree(buf);
}

从用户缓冲区写入到AOF文件(磁盘)

函数write、fsync、fdatasync

  • write只是将数据保存到系统缓冲区或者用户缓冲区,还没有真正落入到磁盘中的
  • fsync是真正地把数据写入到磁盘,即是把缓冲区中的数据落入磁盘。
    • POSIX 标准定义的 fsync 函数在文件元数据(metadata,例如 st_sizest_atimest_mtime 等)变脏时,会将所有元数据同步到磁盘。由于每次同步都必定导致时间戳的改变,而且文件内容和文件元数据通常存储在磁盘上的不同位置,因此每次调用 fsync 至少需要两次随机磁盘 I/O
  • 为此,Linux 平台提供了一个 fdatasync 函数。该函数仅在必要时才将元数据同步到磁盘(文件读写时间戳等信息的改变不会实时落盘),大大降低了元数据同步的频率。
    • 举例来说,文件的尺寸(st_size)如果变化,是需要立即同步的,否则OS一旦崩溃,即使文件的数据部分已同步,由于metadata没有同步,依然读不到修改的内容。而最后访问时间(atime)/修改时间(mtime)是不需要每次都同步的,只要应用程序对这两个时间戳没有苛刻的要求,基本无伤大雅

 Redis 通过条件编译,将 Linux 平台的 redis_fsync 定义成了 fdatasync,而在其它类 Unix 平台上依旧是 fsync

#ifdef __linux__
#define redis_fsync fdatasync
#else
#define redis_fsync fsync
#endif

Redis的线程池

真正写入到磁盘的是使用fsync函数,那说明该函数是相对比较耗时的。Redis维护了一个线程池,就是用来处理一些比较耗时的操作。

那么,AOF缓冲区写入到AOF文件(存入到磁盘)过程中,会先通过write将数据写入到系统缓存,然后根据当前的AOF保存策略,决定是否需要把fsync函数的执行交给线程池。

AOF文件的同步策略

  • no:不进行同步,每个写命令执行完后,只是先把记录写到AOF文件中的内存缓冲区中,由操作系统决定合适将缓冲区内存写回磁盘。
  • always:每次write后,都会立即执行fsync,这种就是在主线程中执行fsync。
  • everysec:每次write后,不会立即执行fsync,理论是每秒执行一次fsync,同时内部将fysnc的执行交给线程池去处理

触发的入口函数——flushAppendOnlyFile

将缓冲区中的数据写入到aof文件的函数是flushAppendOnlyFile。

在Redis中有5处会调用该函数

  1. 通过命令动态关闭AOF功能时,会进行一次保存,即是发送命令将appendonly yes设置为appendonly no。
  2. 在Redis正常关闭之前,会执行该函数。
  3. 在事件循环中的beforesleep函数中会调用一次,这个是AOF功能的主要的保存入口
  4. Redis的定时函数serverCron(默认100毫秒执行一次)
  5. 定时函数serverCron,判断上次AOF写入状态,失败就执行一次该函数。

 定时函数serverCron关于这部分的代码。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {..................//上次的 AOF 写文件时, 没有执行, 将 aof_flush_postponed_start 设置为true, 表示需要延迟处理//存在延迟的AOF落盘操作,在这里完成if (server.aof_state == AOF_ON && server.aof_flush_postponed_start)flushAppendOnlyFile(0);run_with_period(1000) {//上次的写文件失败,即是fync失败if (server.aof_state == AOF_ON && server.aof_last_write_status == C_ERR)flushAppendOnlyFile(0);}...................
}

存在延迟的AOF落盘操作

比如:主线程在执行flushAppendOnlyFile中调用write后,提交一个任务给后台线程,假设此时数据量很大,fsync需要执行较长时间。而主线程又执行到了flushAppendOnlyFile,而上一次的fsync函数还没有执行完,Redis会选择延迟执行,将Server成员变量aof_flush_postponed_start设置为当前时间,就结束该函数。

所以在执行定时任务时候,会判断该变量是否>0,若是,会再执行flushAppendOnlyFile,这个就是AOF同步延迟到定时函数处执行。

但是,延迟到定时任务处触发, 还是无法保证后台线程一定执行完上次的 fsync。所以该函数会根据当前的时间和变量储存的时间进行判断,若是在2s内,就不做任何处理,退出该函数;而大于2s,立即执行AOF缓冲区写入文件的逻辑。

flushAppendOnlyFile的实现

  1. 如果AOF缓冲区为空, 并且AOF策略是everysec,同步到磁盘的内容大小不等于当前AOF文件的内容大小,当前时间 > 上次AOF fsync的时间,同时当前没有正在运行的bio后台任务,则尝试执行fsync。
  2. 如果策略是everysec,且后台存在正在同步的bio线程,则判断aof_flush_postponed_start是否为0:
    1. 若是0,表示之前没有延迟落盘任务,所以就只记录当前的时间戳给aof_flush_postponed_start并退出。
    2. 若是不为0,但判断距离aof_flush_postponed_start是否已经过去2s,若是就增加server.aof_delayed_fsync 计数,强制后续的磁盘同步流程
  3. 调用aofWrite将AOF缓冲区中的数据写入到系统内核缓冲区(这时是还没有使用fsync),若是写入到系统的数据长度不等于当前 AOF 缓冲区的长度, 需要进行异常处理
  4. 如果aof_buf的总空间小于4kb,则清空buffer内容并重新使用该缓冲区,否则创建一个新的。
// AOF 缓冲区数据写入文件
// 当持久策略被设置为 everysec, 实际上会由后台线程进行处理, 那么当前这次刷新写入时, 后台可能有线程还在写入, 所以这时的操作会延迟写入 
//参数force 1:表示无视后台的 fsync, 直接写入, 0: 表示可以延迟, 一般 AOF 过程都是 0
#define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
void flushAppendOnlyFile(int force) {ssize_t nwritten;int sync_in_progress = 0;mstime_t latency;//表示aof缓冲区中没有数据, 就可以结束了,但是Redis中有一些极端情况,不会结束,当前学习可以不用了解,后序熟悉该代码了再回头看if (sdslen(server.aof_buf) == 0) {if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&server.aof_fsync_offset != server.aof_current_size &&server.unixtime > server.aof_last_fsync &&!(sync_in_progress = aofFsyncInProgress())) {goto try_fsync;} else {return;}}// 持久策略为每秒 fsync 一次, 判断后台的线程池是否有线程在执行 fsync if (server.aof_fsync == AOF_FSYNC_EVERYSEC)sync_in_progress = aofFsyncInProgress();//该返回值为true,表示当前有BIO线程在执行 fsync // 持久策略为每秒 fsync 一次, 同时不需要强制写入文件if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {// 当前有 BIO 线程在执行 fsyncif (sync_in_progress) {if (server.aof_flush_postponed_start == 0) { // 0 表示当前没有延迟执行//当前有后台线程在执行 fsync, 那么先延长一下, 设置aof_flush_postponed_start 为当前时间server.aof_flush_postponed_start = server.unixtime;return;//若之前是偶延迟执行,然后又进入了该函数(一般是执行定时函数触发),那这时后台还在执行fsync,但是当前时间和上一次设置的延迟时间小于2s,可以接受,就暂时不做处理} else if (server.unixtime - server.aof_flush_postponed_start < 2) {/* We were already waiting for fsync to finish, but for less* than two seconds this is still ok. Postpone again. */return;}//到了这一步表示线程池中有请求 fsync 的任务, 同时上次延迟距离当前时间超过 2 秒了server.aof_delayed_fsync++;    // 延迟 fsync 的次数 + 1serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");}}if (server.aof_flush_sleep && sdslen(server.aof_buf)) {usleep(server.aof_flush_sleep);}latencyStartMonitor(latency);//调用 write 函数将缓冲区中的数据写入到文件 (此时还在系统缓存, 还没写入到磁盘nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));latencyEndMonitor(latency);if (sync_in_progress) {latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);} else if (hasActiveChildProcess()) {latencyAddSampleIfNeeded("aof-write-active-child",latency);} else {latencyAddSampleIfNeeded("aof-write-alone",latency);}latencyAddSampleIfNeeded("aof-write",latency);//将缓冲区中的数据 write 到系统后, 可以把延迟执行设置为 0   server.aof_flush_postponed_start = 0;                 // 写入到系统的数据长度不等于当前 AOF 缓冲区的长度, 进入异常处理if (nwritten != (ssize_t)sdslen(server.aof_buf)) {static time_t last_write_error_log = 0;if (nwritten == -1) {    // -1, 没有写入任何数据, 就直接失败了server.aof_last_write_errno = errno;}} else {// 大于 -1 但是不等于缓冲区的大小, 写入成功了一部分, if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {//写错误日志..............}} else {/* If the ftruncate() succeeded we can set nwritten to* -1 since there is no longer partial data into the AOF. */nwritten = -1;}server.aof_last_write_errno = ENOSPC;}// 同步策略为 alwaysif (server.aof_fsync == AOF_FSYNC_ALWAYS) {// 这种情况无法处理了, 已经告知客户端写入成功了, 但是当前写入失败了, 直接退出程序。serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");exit(1);} else {// 设置上一次写入状态为异常, 在定时器中会判断这个状态, 再次触发 flushAppendOnlyFile server.aof_last_write_status = C_ERR;if (nwritten > 0) {// 更新当前 aof 文件的大小, 同时将缓冲区中这部分大小的数据移除// 表示这部分写入成功了, 剩余部分下次调用继续server.aof_current_size += nwritten;sdsrange(server.aof_buf,nwritten,-1);}return; /* We'll try again on the next call... */}} else {    //写入成功if (server.aof_last_write_status == C_ERR) {server.aof_last_write_status = C_OK;}}server.aof_current_size += nwritten;    // 更新当前 AOF 文件的大小//如果当前 AOF 缓冲区足够小,小于4K,那么重用这个缓存,否则释放 AOF 缓冲区, 然后重新分配一个    if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {sdsclear(server.aof_buf);} else {sdsfree(server.aof_buf);server.aof_buf = sdsempty();}try_fsync://aof缓冲区中没有数据,但是有一些特例情况的需要处理的..........
}

try_fsync部分的代码:

void flushAppendOnlyFile(int force) {if (sdslen(server.aof_buf) == 0) {if (server.aof_fsync == AOF_FSYNC_EVERYSEC && server.aof_fsync_offset != server.aof_current_size &&server.unixtime > server.aof_last_fsync && !(sync_in_progress = aofFsyncInProgress())) {goto try_fsync;} .......................}..........................
try_fsync:// no-appendfsync-on-rewrite (正在重写, 不执行 fsync) 被设置为 yes//判断是否有运行中的 bio 线程if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess())return;if (server.aof_fsync == AOF_FSYNC_ALWAYS) {latencyStartMonitor(latency);//如果 AOF 落盘策略为 always,直接同步if (redis_fsync(server.aof_fd) == -1) {serverLog(LL_WARNING,"Can't persist AOF for fsync error when the ""AOF fsync policy is 'always': %s. Exiting...", strerror(errno));exit(1);}latencyEndMonitor(latency);latencyAddSampleIfNeeded("aof-fsync-always",latency);server.aof_fsync_offset = server.aof_current_size;//更新 aof_fsync_offset 为当前的 AOF文件大小server.aof_last_fsync = server.unixtime;     // 上次 fsync 为当前的时间} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&server.unixtime > server.aof_last_fsync)) {// 当前没有请求 fsync 的任务在线程池中if (!sync_in_progress) {//提交一个任务,就是向线程池的任务链表中添加任务节点, 最终就是一个后台线程执行一次 redis_fsync 函数aof_background_fsync(server.aof_fd);server.aof_fsync_offset = server.aof_current_size;}server.aof_last_fsync = server.unixtime;}
}

2. AOF重写 

很容易想到的一个情况,文件越写越大。AOF文件是以追加形式逐一记录接受到的写命令的。当一个键值对被多条命令反复修改时,AOF文件会记录相应的多条命令。要是宕机后重启,对同一个key,就需要依次执行AOF文件中对该key的操作命令。但是我们只需要最新的对该key的操作。

所以就有了重写。重写的时候,是根据这个键值对的最新状态,为它生成对应的写入命令。这样一来,一个键值对在重写日志中只使用一条命令即可。在日志恢复时,只执行一条命令,就可以直接完成这个键值对的写入,也方便省时。

AOF 重写的2个触发时机

  • bgrewriteaof 命令被执行。
  • 定时器函数, 定时检查 AOF 文件, 如果满足配置文件里面设置的条件, 就触发AOF重写

用户发送 bgrewriteaof 命令

bgrewriteaof 命令方式对应的逻辑函数为 bgrewriteaofCommand。主要逻辑是:

  1. 如果正在执行重写中了,返回错误提示
  2. 如果正在执行RDB保存,就将 aof_rewrite_scheduled 属性设置为 true, 返回提示后, 结束。之后是通过定时器函数serverCron判断这个状态确定是否需要触发
  3. 否则,调用 rewriteAppendOnlyFileBackground 执行重写
struct redisCommand redisCommandTable[] = {   {"bgrewriteaof",bgrewriteaofCommand,1,"admin no-script",0,NULL,0,0,0,0,0,0},.................
};void bgrewriteaofCommand(client *c) {if (server.child_type == CHILD_TYPE_AOF) {addReplyError(c,"Background append only file rewriting already in progress");} else if (hasActiveChildProcess()) {server.aof_rewrite_scheduled = 1;addReplyStatus(c,"Background append only file rewriting scheduled");} else if (rewriteAppendOnlyFileBackground() == C_OK) {addReplyStatus(c,"Background append only file rewriting started");} else {addReplyError(c,"Can't execute an AOF background rewriting. ""Please check the server logs for more information.");}
}

在定时函数serverCron中触发

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {  .......................// 后台没有进程在 RDB 和 AOF, 同时通过 bgrewriteaof 命令设置了定时刷新重写 AOF  if (!hasActiveChildProcess() && server.aof_rewrite_scheduled) {rewriteAppendOnlyFileBackground();}if (hasActiveChildProcess() || ldbPendingChildren()){    // 后台有进程在 RDB 或者 AOF.......................} else {    // 当前后台 没有进程在 RDB 或者 AOF//省略了一些检查.............//达到了AOF重写的条件:开启了AOF && 后台没有RDB和AOF重写进行 &&// 配置了目前 AOF 文件大小超过上次重写的 AOF 文件的百分比 &&//当前的 AOF 文件大小超过了配置的需要触发重写的最小大小if (server.aof_state == AOF_ON && !hasActiveChildProcess() &&server.aof_rewrite_perc && server.aof_current_size > server.aof_rewrite_min_size){// 计算当前的文件增长的比例long long base = server.aof_rewrite_base_size ?server.aof_rewrite_base_size : 1;long long growth = (server.aof_current_size*100/base) - 100;// 超过了就调用 rewriteAppendOnlyFileBackground 进行重写if (growth >= server.aof_rewrite_perc) {rewriteAppendOnlyFileBackground();}}}...........................................
}

所以,都是集中到函数rewriteAppendOnlyFileBackground中处理的

在某时刻,需要AOF文件重写:

  • 那为了不阻塞主线程,那可以fork一个子进程来重写。fork出来的子进程,拥有了和父进程一样的内存数据,子进程就先把这些内存数据写入到一个AOF临时文件。
  • 但是在这个过程中,父进程还是能接受客户端的命令的,所以父子进程需要通讯,而Redis中父子进程是使用管道pipe进行通讯的。

父子进程使用pipe进行通信 

Redis是使用了3个管道。每个管道有2端,所以有6个fd。

struct redisServer {/* AOF pipes used to communicate between parent and child during rewrite. */int aof_pipe_write_data_to_child;int aof_pipe_read_data_from_parent;int aof_pipe_write_ack_to_parent;int aof_pipe_read_ack_from_child;int aof_pipe_write_ack_to_child;int aof_pipe_read_ack_from_parent;...................
}; int aofCreatePipes(void) {int fds[6] = {-1, -1, -1, -1, -1, -1};int j;//int pipe(int pipefd[2]); 成功:0;失败:-1,设置errno,函数调用成功返回r/w两个文件描述符if (pipe(fds) == -1) goto error; /* parent -> children data. */if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */if (pipe(fds+4) == -1) goto error; /* parent -> children ack. *//* Parent -> children data is non blocking. */if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;server.aof_pipe_write_data_to_child = fds[1];server.aof_pipe_read_data_from_parent = fds[0];server.aof_pipe_write_ack_to_parent = fds[3];server.aof_pipe_read_ack_from_child = fds[2];server.aof_pipe_write_ack_to_child = fds[5];server.aof_pipe_read_ack_from_parent = fds[4];server.aof_stop_sending_diff = 0;return C_OK;................
}
  1. aof_pipe_write_data_to_child 和 aof_pipe_read_data_from_parent, 主要是父进程将子进程重写过程中产生的命令同步给子进程(即是同步数据
  2. aof_pipe_write_ack_to_parent 和 aof_pipe_read_ack_from_child, 主要是用于子进程通知父进程停止同步变更命令
  3. aof_pipe_write_ack_to_child 和 aof_pipe_read_ack_from_parent, 主要用于父进程响应子进程的停止同步变更命令的请求

 我们要重点关注aof_pipe_write_data_to_child(写端) aof_pipe_read_data_from_parent(读端)。这两个是传输Redis内存数据的管道fd。

两个缓冲区——重写缓冲差异缓冲

重写缓冲

struct redisServer {list *aof_rewrite_buf_blocks; //重写缓冲区,注意是个链表  /* Hold changes during an AOF rewrite. */...................
};#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10)    /* 10 MB per block *///AOF 重写缓存列表的节点定义
typedef struct aofrwblock {unsigned long used, free;    //used:已使用的空间,free:剩余的空阿金char buf[AOF_RW_BUF_BLOCK_SIZE];
} aofrwblock;void aofRewriteBufferReset(void) {if (server.aof_rewrite_buf_blocks)listRelease(server.aof_rewrite_buf_blocks);server.aof_rewrite_buf_blocks = listCreate();listSetFreeMethod(server.aof_rewrite_buf_blocks,zfree);
}

什么时候使用到重写缓冲区?那就是需要进行AOF重写的时候。

将缓冲区中的数据写入到aof的函数是flushAppendOnlyFile。那也是在该函数中,会使用到重写缓冲区。

void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {.............................if (server.child_type == CHILD_TYPE_AOF)    /// 如果后台正在进行重写aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));//将命令写入到 AOF 重写缓冲区
}/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {//获取缓冲区列表,是添加在尾部,所以获取尾部listNode *ln = listLast(server.aof_rewrite_buf_blocks);aofrwblock *block = ln ? ln->value : NULL;while(len) {/* If we already got at least an allocated block, try appending* at least some piece into it. */if (block) {    //表明重写缓冲列表已有数据//计算当前节点的剩余空间是否够len长度的数据写入unsigned long thislen = (block->free < len) ? block->free : len;if (thislen) {  /* The current block is not already full. */memcpy(block->buf+block->used, s, thislen);block->used += thislen;block->free -= thislen;s += thislen;len -= thislen;}}// len > 0, 说明还需要空间, 但是当前的节点没有空间了, 需要新建一个节点if (len) { /* First block to allocate, or need another block. */int numblocks;// 分配新的缓存节点, 同时放到列表的尾部block = zmalloc(sizeof(*block));block->free = AOF_RW_BUF_BLOCK_SIZE;block->used = 0;listAddNodeTail(server.aof_rewrite_buf_blocks,block);numblocks = listLength(server.aof_rewrite_buf_blocks);if (((numblocks+1) % 10) == 0) {int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :LL_NOTICE;serverLog(level,"Background AOF buffer size: %lu MB",aofRewriteBufferSize()/(1024*1024));}}}// 注册一个文件事件, 用来将缓冲区的数据写入到 aof_pipe_write_data_to_child 中, //然后在 Pipe 的作用下, 可以同步到 aof_pipe_read_data_from_parentif (!server.aof_stop_sending_diff &&aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0){//这里注意:注册的是 写事件, 写事件就绪的条件是内核空间的缓冲有空,就可以写aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,AE_WRITABLE, aofChildWriteDiffData, NULL);}
}

接着来看看管道fd的写事件回调函数aofChildWriteDiffData

那么当内核缓冲区空间有空闲,就会触发该管道fd的写事件,就会执行aofChildWriteDiffData通过该函数就把重写缓存中的数据写到了管道中,供子进程读取到子进程的差异缓冲中。

//事件回调函数, 把当前的 AOF 缓冲区同步到 aof_pipe_write_data_to_child, 在 Pipe 的作用下间接同步到 aof_pipe_read_data_from_parent
void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {listNode *ln;aofrwblock *block;ssize_t nwritten;while(1) {ln = listFirst(server.aof_rewrite_buf_blocks);block = ln ? ln->value : NULL;// 停止同步 或者 重写缓冲区为空,  就需要删除这个 写事件if (server.aof_stop_sending_diff || !block) {aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,AE_WRITABLE);return;}if (block->used > 0) {// 把 block 的数据写入到 aof_pipe_write_data_to_childnwritten = write(server.aof_pipe_write_data_to_child,block->buf,block->used);if (nwritten <= 0) return;memmove(block->buf,block->buf+nwritten,block->used-nwritten);block->used -= nwritten;block->free += nwritten;}if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);}
}

差异缓冲

在子进程重写AOF过程中,子进程等待主进程把重写缓冲中的数据通过pipe发送到差异缓冲区。

struct redisServer {sds aof_child_diff;  //子进程的差异缓冲区  /* AOF diff accumulator child side. */te. */...................
};

子进程通过pipe将重写缓冲区中的数据同步到差异缓冲区的函数是aofReadDiffFromParent

ssize_t aofReadDiffFromParent(void) {char buf[65536]; /* Default pipe buffer size on most Linux systems. */ssize_t nread, total = 0;// 将 aof_pipe_read_data_from_parent 中的数据读取到 buf 中while ((nread =read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {// 把buf的数据拼接到aof_child_diff 中        server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);  total += nread;}return total;
}

rewriteAppendOnlyFileBackground的实现

了解了Redis中关于AOF重写的两个缓冲区和父子进程通过pipe通讯,那对AOF重写的过程就好理解了。

其具体的细节步骤:

  1. 主进程fork出一个子进程,让子进程来进行AOF重写。fork出来的子进程,拥有了和父进程一样的内存数据 
  2. 子进程将内存中的数据写入到一个AOF临时文件中
  3. 在子进程重写期间,主进程还是会继续将新到达的命令追加写到原AOF,并将这些命令拷贝到重写缓冲,然后通过pipe管道发送给子进程的差异缓冲中。
  4. 子进程处理完内存数据后,就把差异缓冲中的数据追加到临时AOF文件中,之后就禁止主进程发新数据。
  5. 这时,若主进程中的重写缓存中还剩余数据,就把该数据追加到临时AOF文件中,再用临时AOF文件替换旧的AOF,结束。 
int rewriteAppendOnlyFileBackground(void) {pid_t childpid;if (hasActiveChildProcess()) return C_ERR;  //判断当前没有RDB和aof重写  if (aofCreatePipes() != C_OK) return C_ERR;    //创建 Pipe 通道, 用于父子进程之间通信//创建 AOF 子进程if ((childpid = redisFork(CHILD_TYPE_AOF)) == 0) {char tmpfile[256];/* Child */redisSetProcTitle("redis-aof-rewrite");//将自己绑定给某个cpuredisSetCpuAffinity(server.aof_rewrite_cpulist);snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());//这个是重点,  重写AOFif (rewriteAppendOnlyFile(tmpfile) == C_OK) {//子进程重写完成的一些收尾工作, 基本不涉及主流程, 通知父进程过程中子进程修改了多少数据sendChildCowInfo(CHILD_INFO_TYPE_AOF_COW_SIZE, "AOF rewrite");exitFromChild(0);} else {exitFromChild(1);}} else {/* Parent */..............server.aof_rewrite_scheduled = 0;server.aof_rewrite_time_start = time(NULL);server.aof_selected_db = -1;// 清空 redisServer 的 repl_scriptcache_dict 字典和 repl_scriptcache_fifo 这个列表// 和主从复制相关replicationScriptCacheFlush();return C_OK;}return C_OK; /* unreached */
}

执行重写过程的函数——rewriteAppendOnlyFile

子进程执行的rewriteAppendOnlyFile就是真正的AOF重写过程。

这个流程步骤有点多:

  1. 打开aof临时文件,并命名;初始化差异缓冲server.aof_child_diff
  2. 若是启用了混合持久化,则调用rdbSaveRio将 RDB 数据写入 aof 临时文件;否则,调用 rewriteAppendOnlyFileRio() 进行普通的 aof 重写。其内部会遍历字典快照,删除无效数据后,将其封装为 RESP 数据写入临时文件。在遍历的过程中,还会周期性地从管道中拉取增量数据到 aof_child_diff
  3. 将I/O缓冲和内核缓冲中的剩余数据同步到磁盘
  4. 从管道中读取剩余的增量数据,持续一段时间
  5. 停止读取后,发送指令给管道让主进程停止向管道写入。然后等待主进程地 ACK;
  6. 此时父进程不会在同步差异命令过来了, 再做最后一次同步, 将 Pipe 通道中残留的数据同步过来,再次从管道中读取数据。
  7. 将差异缓冲中的数据追加到AOF临时文件中,并再次将AOF临时文件缓冲中的数据同步到磁盘中。
  8. 修改临时文件名,并确认写入成功
int rewriteAppendOnlyFile(char *filename) {rio aof;char tmpfile[256];char byte;// 1snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());FILE *fp = fopen(tmpfile,"w");//..................// 清空 aof_child_diff 的数据, 这个就是 AOF 子进程差异缓冲区server.aof_child_diff = sdsempty();rioInitWithFile(&aof,fp);      // 初始 rio 流, 也就是 IO 流, 用于写入数据到文件// 设定 fsync 触发条件if (server.aof_rewrite_incremental_fsync)rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);startSaving(RDBFLAGS_AOF_PREAMBLE);// 2if (server.aof_use_rdb_preamble) {int error;//混合持久化if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {errno = error;goto werr;}} else {//普通持久化if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;}// 3//fflush:是把C库中的缓冲调用write函数写到磁盘[其实是写到内核的缓冲区]。//fsync:是把内核缓冲刷到磁盘上。if (fflush(fp) == EOF) goto werr;if (fsync(fileno(fp)) == -1) goto werr;int nodata = 0;mstime_t start = mstime();// 4 .从管道中拉取剩余的增量数据,持续一段时间while(mstime()-start < 1000 && nodata < 20) {if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0){nodata++;continue;}nodata = 0; /* Start counting from zero, we stop on N *contiguous*timeouts. */aofReadDiffFromParent();    //从管道读数据到 差异缓冲aof_child_diff}// 5 通知主进程 停止发送增量数据if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)goto werr;// 等待主进程的 ACK,最多等 5sif (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||byte != '!') goto werr;// 此时父进程不会在同步差异命令过来了, 再做最后一次同步, 将 Pipe 通道中残留的数据同步过来// 再次从管道中读取差异数据aofReadDiffFromParent();//获取差异缓冲数据的内容大小size_t bytes_to_write = sdslen(server.aof_child_diff);const char *buf = server.aof_child_diff;long long cow_updated_time = mstime();long long key_count = dbTotalServerKeyCount();// 6 . 将差异缓冲数据写入 aof 文件while (bytes_to_write) {size_t chunk_size = bytes_to_write < (8<<20) ? bytes_to_write : (8<<20);// 将 aof_child_diff 中的数据写入到 aof 文件中if (rioWrite(&aof,buf,chunk_size) == 0)goto werr;bytes_to_write -= chunk_size;buf += chunk_size;/* Update COW info */long long now = mstime();if (now - cow_updated_time >= 1000) {sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, key_count, "AOF rewrite");cow_updated_time = now;}}// 7 .将 aof 文件缓冲中的数据,同步到磁盘if (fflush(fp)) goto werr;if (fsync(fileno(fp))) goto werr;if (fclose(fp)) { fp = NULL; goto werr; }fp = NULL;//8 .重命名文件if (rename(tmpfile,filename) == -1) {unlink(tmpfile);stopSaving(0);return C_ERR;}stopSaving(1);return C_OK;werr:if (fp) fclose(fp);unlink(tmpfile);stopSaving(0);return C_ERR;
}

父进程监听子进程结束, AOF 重写收尾

在定时函数serverCron中监听

那是在哪进行监听呢?还是在定时函数serverCron中。定时地检查子进程的状态是否为结束了, 是的话, 执行结束逻辑。在下一次运行 serverCron定时函数时,调用 checkChildrenDone()完成 AOF 收尾工作。checkChildrenDone的核心内容backgroundRewriteDoneHandler函数

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {.................// 检查是否有 RDB 子进程或者 AOF 重写子进程结束了if (hasActiveChildProcess() || ldbPendingChildren()){run_with_period(1000) receiveChildInfo();checkChildrenDone();} else {...........}............................
}/* Receive info data from child. */
void receiveChildInfo(void) {if (server.child_info_pipe[0] == -1) return;size_t cow;monotime cow_updated;size_t keys;double progress;childInfoType information_type;/* Drain the pipe and update child info so that we get the final message. */while (readChildInfo(&information_type, &cow, &cow_updated, &keys, &progress)) {updateChildInfo(information_type, cow, cow_updated, keys, progress);}
}void checkChildrenDone(void) {int statloc = 0;pid_t pid;// wait3可以获取所有的进程是否有一个进程退出状态的, 有的话, 进行彻底的销毁,并返回其进程idif ((pid = waitpid(-1, &statloc, WNOHANG)) != 0) {int exitcode = WIFEXITED(statloc) ? WEXITSTATUS(statloc) : -1;int bysignal = 0;if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);if (exitcode == SERVER_CHILD_NOERROR_RETVAL) {bysignal = SIGUSR1;exitcode = 1;}if (pid == -1) {//打印日志} else if (pid == server.child_pid) {if (server.child_type == CHILD_TYPE_RDB) {backgroundSaveDoneHandler(exitcode, bysignal);} else if (server.child_type == CHILD_TYPE_AOF) {backgroundRewriteDoneHandler(exitcode, bysignal); //自己想哦买噶最终的清理逻辑} if (!bysignal && exitcode == 0) receiveChildInfo();    //获取子进程发送给父进程的信息resetChildState();} else {if (!ldbRemoveChild(pid)) {//打印日志}}/* start any pending forks immediately. */replicationStartPendingFork();}
}

主进程对AOF重写收尾——backgroundRewriteDoneHandler

主进程的backgroundRewriteDoneHandler中主要是4步骤:

  1. 打开子进程刚刚处理完的 aof 临时文件
  2. 将停止发送增量数据期间积累的数据追加到 临时AOF文件
  3.  重命名,替换旧的aof文件
  4. 最后,进行清除工作
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {if (!bysignal && exitcode == 0) {int newfd, oldfd;char tmpfile[256];long long now = ustime();mstime_t latency;latencyStartMonitor(latency);snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int)server.child_pid);// 1 打开子进程刚刚处理完的 aof 临时文件newfd = open(tmpfile,O_WRONLY|O_APPEND);if (newfd == -1) { goto cleanup; }// 2 将停止发送增量数据期间积累的数据追加到 临时AOF文件if (aofRewriteBufferWrite(newfd) == -1) {close(newfd); goto cleanup;}latencyEndMonitor(latency);latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);if (server.aof_fsync == AOF_FSYNC_EVERYSEC) {aof_background_fsync(newfd);} else if (server.aof_fsync == AOF_FSYNC_ALWAYS) {latencyStartMonitor(latency);if (redis_fsync(newfd) == -1) {close(newfd);goto cleanup;}latencyEndMonitor(latency);latencyAddSampleIfNeeded("aof-rewrite-done-fsync",latency);}// aof_fd 为当前的 AOF 文件的文件描述符, 等于 -1, 应该是 AOF 功能停用了// 这时为了下面的流程能走下去, 从配置文件中获取到配置的文件名, 尝试打开禁用前的文件if (server.aof_fd == -1) {/* AOF disabled */oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK);} else {/* AOF enabled */oldfd = -1; /* We'll set this to the current AOF filedes later. */}latencyStartMonitor(latency);// 3  重命名,替换旧的aof文件if (rename(tmpfile,server.aof_filename) == -1) {close(newfd);if (oldfd != -1) close(oldfd);goto cleanup;}latencyEndMonitor(latency);latencyAddSampleIfNeeded("aof-rename",latency);if (server.aof_fd == -1) {/* AOF disabled, we don't need to set the AOF file descriptor* to this new file, so we can close it. */close(newfd);} else {/* AOF enabled, replace the old fd with the new one. */oldfd = server.aof_fd;server.aof_fd = newfd;server.aof_selected_db = -1; /* Make sure SELECT is re-issued */aofUpdateCurrentSize();server.aof_rewrite_base_size = server.aof_current_size;server.aof_fsync_offset = server.aof_current_size;server.aof_last_fsync = server.unixtime;/* Clear regular AOF buffer since its contents was just written to* the new AOF from the background rewrite buffer. */sdsfree(server.aof_buf);server.aof_buf = sdsempty();}server.aof_lastbgrewrite_status = C_OK;/* Change state from WAIT_REWRITE to ON if needed */if (server.aof_state == AOF_WAIT_REWRITE)server.aof_state = AOF_ON;/* Asynchronously close the overwritten AOF. */if (oldfd != -1) bioCreateCloseJob(oldfd);} else if (!bysignal && exitcode != 0) {server.aof_lastbgrewrite_status = C_ERR;} else {if (bysignal != SIGUSR1)server.aof_lastbgrewrite_status = C_ERR;}cleanup://清除工作aofClosePipes();aofRewriteBufferReset();aofRemoveTempFile(server.child_pid);server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;server.aof_rewrite_time_start = -1;/* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */if (server.aof_state == AOF_WAIT_REWRITE)server.aof_rewrite_scheduled = 1;
}

重写失败的话,原来的AOF文件依然是可以使用的。在AOF重写过程中,新来的命令会被写入磁盘两次(主进程写入到旧AOF,子进程是追加到临时AOF),这就会浪费一定的磁盘空间(磁盘便宜大碗,没问题的)。只是在重写过程中,新的命令会被全部储存到子进程的差异缓冲区中,这可能会导致较高的内存占用。

3. Redis重启,AOF 文件加载

从main函数开始,其调用流程 main——>loadDataFromDisk——>loadAppendOnlyFile

其主要流程:

  1. 打开AOF文件
  2. 创建一个虚拟客户端,用于执行AOF中的命令
  3. 根据aof文件中的前导码,判断若是REDIS开头,就调用rdbLoadRio加载RDB的数据;否则将文件指针归零;
  4. 开始循环处理RESP格式的字符串
    1. 按照RESP协议读取命令的参数的个数
    2. 读取命令的每个参数
    3. 根据第一个参数,查询命令表,得到命令
    4. 执行命令
int main(int argc, char **argv) {.........if (!server.sentinel_mode) {    //非哨兵模式loadDataFromDisk();............}
}void loadDataFromDisk(void) {if (server.aof_state == AOF_ON) {loadAppendOnlyFile(server.aof_filename)   }...................
}int loadAppendOnlyFile(char *filename) {struct client *fakeClient;// 1  打开aof文件FILE *fp = fopen(filename,"r");struct redis_stat sb;int old_aof_state = server.aof_state;long loops = 0;off_t valid_up_to = 0; /* Offset of latest well-formed command loaded. */off_t valid_before_multi = 0; /* Offset before MULTI command loaded. */if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {server.aof_current_size = 0;server.aof_fsync_offset = server.aof_current_size;fclose(fp);return C_ERR;}/* Temporarily disable AOF, to prevent EXEC from feeding a MULTI* to the same file we're about to read. */server.aof_state = AOF_OFF;// 2  创建虚拟客户端fakeClient = createAOFClient();startLoadingFile(fp, filename, RDBFLAGS_AOF_PREAMBLE);// 3  根据是否有RDB前导码,再确定处理方式char sig[5]; /* "REDIS" */if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) {/* No RDB preamble, seek back at 0 offset. */if (fseek(fp,0,SEEK_SET) == -1) goto readerr;} else {/* RDB preamble. Pass loading the RDB functions. */rio rdb;if (fseek(fp,0,SEEK_SET) == -1) goto readerr;rioInitWithFile(&rdb,fp);//加载rdb内容if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL) != C_OK) {goto readerr;}}// 4  循环处理Aof文件中剩下的所有命令while(1) {int argc, j;unsigned long len;robj **argv;char buf[128];sds argsds;struct redisCommand *cmd;/* Serve the clients from time to time */if (!(loops++ % 1000)) {loadingProgress(ftello(fp));processEventsWhileBlocked();processModuleLoadingProgressEvent(1);}if (fgets(buf,sizeof(buf),fp) == NULL) {if (feof(fp))break;elsegoto readerr;}if (buf[0] != '*') goto fmterr;if (buf[1] == '\0') goto readerr;// 4.1  按照resp协议读取命令的参数数量argc = atoi(buf+1);if (argc < 1) goto fmterr;argv = zmalloc(sizeof(robj*)*argc);fakeClient->argc = argc;fakeClient->argv = argv;// 4.2  循环读取命令的每个参数for (j = 0; j < argc; j++) {/* Parse the argument len. */char *readres = fgets(buf,sizeof(buf),fp);if (readres == NULL || buf[0] != '$') {fakeClient->argc = j; /* Free up to j-1. */freeFakeClientArgv(fakeClient);if (readres == NULL)goto readerr;elsegoto fmterr;}len = strtol(buf+1,NULL,10);/* Read it into a string object. */argsds = sdsnewlen(SDS_NOINIT,len);if (len && fread(argsds,len,1,fp) == 0) {sdsfree(argsds);fakeClient->argc = j; /* Free up to j-1. */freeFakeClientArgv(fakeClient);goto readerr;}argv[j] = createObject(OBJ_STRING,argsds);/* Discard CRLF. */if (fread(buf,2,1,fp) == 0) {fakeClient->argc = j+1; /* Free up to j. */freeFakeClientArgv(fakeClient);goto readerr;}}// 4.3  根据第一个参数,查询命令表,获取命令cmd = lookupCommand(argv[0]->ptr);if (cmd == server.multiCommand) valid_before_multi = valid_up_to;// 4.4 执行命令fakeClient->cmd = fakeClient->lastcmd = cmd;if (fakeClient->flags & CLIENT_MULTI &&fakeClient->cmd->proc != execCommand){queueMultiCommand(fakeClient);} else {cmd->proc(fakeClient);}/* Clean up. Command code may have changed argv/argc so we use the* argv/argc of the client instead of the local variables. */freeFakeClientArgv(fakeClient);fakeClient->cmd = NULL;if (server.aof_load_truncated) valid_up_to = ftello(fp);if (server.key_load_delay)debugDelay(server.key_load_delay);}if (fakeClient->flags & CLIENT_MULTI) {valid_up_to = valid_before_multi;goto uxeof;}..........................
}

goto部分的代码:

int loadAppendOnlyFile(char *filename) {...................................
loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */fclose(fp);freeFakeClient(fakeClient);server.aof_state = old_aof_state;stopLoading(1);aofUpdateCurrentSize();server.aof_rewrite_base_size = server.aof_current_size;server.aof_fsync_offset = server.aof_current_size;return C_OK;readerr: /* Read error. If feof(fp) is true, fall through to unexpected EOF. */if (!feof(fp)) {if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */fclose(fp);serverLog(LL_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));exit(1);}uxeof: /* Unexpected AOF end of file. */if (server.aof_load_truncated) {serverLog(LL_WARNING,"!!! Warning: short read while loading the AOF file !!!");serverLog(LL_WARNING,"!!! Truncating the AOF at offset %llu !!!",(unsigned long long) valid_up_to);if (valid_up_to == -1 || truncate(filename,valid_up_to) == -1) {if (valid_up_to == -1) {serverLog(LL_WARNING,"Last valid command offset is invalid");} else {serverLog(LL_WARNING,"Error truncating the AOF file: %s",strerror(errno));}} else {/* Make sure the AOF file descriptor points to the end of the* file after the truncate call. */if (server.aof_fd != -1 && lseek(server.aof_fd,0,SEEK_END) == -1) {serverLog(LL_WARNING,"Can't seek the end of the AOF file: %s",strerror(errno));} else {serverLog(LL_WARNING,"AOF loaded anyway because aof-load-truncated is enabled");goto loaded_ok;}}}if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */fclose(fp);exit(1);fmterr: /* Format error. */if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */fclose(fp);exit(1);
}
http://www.dt0577.cn/news/5606.html

相关文章:

  • 火币网站怎么做空搜索引擎营销包括
  • 日本做牛仔裤视频网站国外网页模板
  • 上海网站制作福州十大免费域名
  • 网站的优化与推广分析重庆网站制作公司
  • 巴彦淖尔专业做网站的seo服务商技术好的公司
  • 网站怎么推广效果最好刷百度关键词排名优化
  • 网站开发需要先学数据库么产品推广渠道
  • 智能响应式网站建设搜索引擎下载安装
  • 网页编程培训宁波免费seo在线优化
  • 自助建设wap网站竞价推广是做什么的
  • 网站更换图片之类的怎么做怎么免费搭建自己的网站
  • 怎么样提高网站点击率吉林seo基础
  • 做网站要的软件创新营销方式有哪些
  • 万网网站安装百度搜索风云榜下载
  • wordpress中上传整站正规职业技能培训机构
  • 网站建设和管理办法爱站网站
  • 织梦系统做网站用手机制作自己的网站
  • 网站检测中心seo关键词排名软件
  • 做网站实训目的和意义软文素材库
  • 易语言和网站做交互网络推广有多少种方法
  • 做直播大秀的平台和网站南宁网站推广哪家好
  • 自己的电脑做服务器 并建网站代写新闻稿
  • 做网站选哪家公司企业培训机构排名前十
  • 免费云建站网络营销所学课程
  • 网站切图大图网络推广方案范例
  • 常见的动态网站开发语言汕头自动seo
  • wordpress 主题数据seo外链软件
  • 德州网站建设推广宁波seo外包服务商
  • 石湾网站建设域名注册新网
  • 厦门网页设计公司价格网上seo研究