您好,欢迎来到三六零分类信息网!老站,搜索引擎当天收录,欢迎发信息

Redis源码学习-AOF

2025/1/14 15:53:25发布17次查看
前言 网络上也有许多介绍redis的aof机制的文章,但是从宏观上介绍aof的流程,没有具体分析在aof过程中涉及到的数据结构和控制机制。昨晚特别看了2.8源码,感觉源码中的许多细节是值得细细深究的。特别是list *aof_rewrite_buf_blocks结构。仔细看源码,会发
前言网络上也有许多介绍redis的aof机制的文章,但是从宏观上介绍aof的流程,没有具体分析在aof过程中涉及到的数据结构和控制机制。昨晚特别看了2.8源码,感觉源码中的许多细节是值得细细深究的。特别是list *aof_rewrite_buf_blocks结构。仔细看源码,会发现原来看网络文章多的到的领会是片面的,最好的学习还是得自己动手...
原文链接: http://blog.csdn.net/ordeder/article/details/39271543
作者提及的aof简化的流程为:
* 1) the user calls bgrewriteaof
* 2) redis calls this function, that forks():
* 2a) the child rewrite the append only file in a temp file.
* 2b) the parent accumulates differences in server.aof_rewrite_buf.
* 3) when the child finished '2a' exists.
* 4) the parent will trap the exit code, if it's ok, will append the
* data accumulated into server.aof_rewrite_buf into the temp file, and
* finally will rename(2) the temp file in the actual file name.
* the the new file is reopened as the new append only file. profit!aof流程依据源码,aof总体有一下操作:
主要函数:
//函数1:将command写入aof_buff
void feedappendonlyfile(struct rediscommand *cmd, int dictid, robj **argv, int argc);
//函数2:启动子进程,子进程用于刷一遍redis中的数据
int rewriteappendonlyfilebackground(void);
//函数3:刷一遍server.db[16],依次将对象写入磁盘临时文件tmpfile
int rewriteappendonlyfile(char *filename);
//函数4:将aof_buff内容持久化
void flushappendonlyfile(int force);
//函数5:将server.aof_rewrite_buf_blocks中的内容写入tmpfile,并替换aof文件
void backgroundrewritedonehandler(exitcode,bysignal);
1 aof日常命令append:
1.1. redis执行文件事件:执行用户命令,并将该命令缓存于server.aof_buf中{函数1}
1.2. redis执行时间时间的servercron:依据参数server.aof_flush_postponed_start,{函数4}
1.2.1. 将redisserver.aof_buf写入文件server.aof_fd。
1.2.2. 该文件何时fsync到磁盘有三种机制:
aof_fsync_everysec 每秒调用fsync
aof_fsync_always 写文件后立即调用fsync
其他 听系统的
2 aof日志简化操作:
2.1. redis执行时间时间的servercron:{函数2-3}
2.1.1. 开启后台aof进程,依据redis内存数据(redis.db[16]),生成可重建数据库的命令集,并写入tmpfile临时文件
2.2. redis执行文件事件:
执行用户命令时,{函数1}
2.2.1. 将该命令缓存于redisserver.aof_buf;
2.2.2. 同时将该命令缓存于server.aof_rewrite_buf_blocks
2.3. redis执行时间时间的servercron:
2.3.1 {函数4}在aof子进程还未结束期间,步骤 1.2 照常执行,将aof_buf写入aof_fd(该干嘛干嘛)
2.3.2 wait3发现aof子进程结束,那么:{函数5}
2.3.2.1 将server.aof_rewrite_buf_blocks中的内容写入tmpfile中
2.3.2.2 用tmpfile替换原有aof文件,并重置server.aof_fd
函数和数据间关系如下图所示:
源码struct redisserver{ ... /* aof persistence */ int aof_state; /* redis_aof_(on|off|wait_rewrite) */ int aof_fsync; /* kind of fsync() policy (每个操作|每秒|缓冲区满)*/ char *aof_filename; /* name of the aof file */ int aof_no_fsync_on_rewrite; /* don't fsync if a rewrite is in prog. */ int aof_rewrite_perc; /* rewrite aof if % growth is > m and... */ off_t aof_rewrite_min_size; /* the aof file is at least n bytes. */ off_t aof_rewrite_base_size; /* aof size on latest startup or rewrite. */ off_t aof_current_size; /* aof current size. */ int aof_rewrite_scheduled; /* rewrite once bgsave terminates. 是否需要开启后台aof子进程*/ pid_t aof_child_pid; /* pid if rewriting process */ list *aof_rewrite_buf_blocks; /* hold changes during an aof rewrite. 在aof bgsave期间redis执行的命令将存储到aof_rewrite_buf_blocks,当然aof_buf还是要照常使用的,二者不冲突*/ sds aof_buf; /* aof buffer, written before entering the event loop */ int aof_fd; /* file descriptor of currently selected aof file */ int aof_selected_db; /* currently selected db in aof */ time_t aof_flush_postponed_start; /* unix time of postponed aof flush */ time_t aof_last_fsync; /* unix time of last fsync() */ time_t aof_rewrite_time_last; /* time used by last aof rewrite run. */ time_t aof_rewrite_time_start; /* current aof rewrite start time. */ int aof_lastbgrewrite_status; /* redis_ok or redis_err */ unsigned long aof_delayed_fsync; /* delayed aof fsync() counter */ int aof_rewrite_incremental_fsync;/* fsync incrementally while rewriting? */ ...}//////////////////////////////////////////////////////////////////////////////////* call() is the core of redis execution of a command */void call(redisclient *c, int flags) { long long dirty, start = ustime(), duration; int client_old_flags = c->flags; ... /* 执行用户命令 */ c->flags &= ~(redis_force_aof|redis_force_repl); redisoparrayinit(&server.also_propagate); dirty = server.dirty; c->cmd->proc(c); dirty = server.dirty-dirty; duration = ustime()-start; ... /* 将用户命令进行aof备份 */ if (flags & redis_call_propagate) { int flags = redis_propagate_none; if (c->flags & redis_force_repl) flags |= redis_propagate_repl; if (c->flags & redis_force_aof) flags |= redis_propagate_aof; if (dirty) flags |= (redis_propagate_repl | redis_propagate_aof); if (flags != redis_propagate_none) propagate(c->cmd,c->db->id,c->argv,c->argc,flags); }}void propagate(struct rediscommand *cmd, int dbid, robj **argv, int argc, int flags){ if (server.aof_state != redis_aof_off && flags & redis_propagate_aof) feedappendonlyfile(cmd,dbid,argv,argc); if (flags & redis_propagate_repl) replicationfeedslaves(server.slaves,dbid,argv,argc);}void feedappendonlyfile(struct rediscommand *cmd, int dictid, robj **argv, int argc) { sds buf = sdsempty(); robj *tmpargv[3]; /* 如果当前操作的dict和前一次操作的dict不同, 那么redis要在aof中添加一条:select命令,选择当前dict */ if (dictid != server.aof_selected_db) { char seldb[64]; snprintf(seldb,sizeof(seldb),%d,dictid); buf = sdscatprintf(buf,*2\r\n$6\r\nselect\r\n$%lu\r\n%s\r\n, (unsigned long)strlen(seldb),seldb); server.aof_selected_db = dictid; } //依据不同的命令,进行字符画处理,并将结果写入临时的buff中 if (cmd->proc == expirecommand || cmd->proc == pexpirecommand || cmd->proc == expireatcommand) { /* translate expire/pexpire/expireat into pexpireat */ buf = catappendonlyexpireatcommand(buf,cmd,argv[1],argv[2]); } else if (cmd->proc == setexcommand || cmd->proc == psetexcommand) { /* translate setex/psetex to set and pexpireat */ tmpargv[0] = createstringobject(set,3); tmpargv[1] = argv[1]; tmpargv[2] = argv[3]; buf = catappendonlygenericcommand(buf,3,tmpargv); decrrefcount(tmpargv[0]); buf = catappendonlyexpireatcommand(buf,cmd,argv[1],argv[2]); } else { /* all the other commands don't need translation or need the * same translation already operated in the command vector * for the replication itself. */ buf = catappendonlygenericcommand(buf,argc,argv); } /* append to the aof buffer. this will be flushed on disk just before * of re-entering the event loop, so before the client will get a * positive reply about the operation performed. */ //如果用户开启的aof,那么将当前命令的buff append到server.aof_buf缓冲的尾部 if (server.aof_state == redis_aof_on) server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf)); /* if a background append only file rewriting is in progress we want to * accumulate the differences between the child db and the current one * in a buffer, so that when the child process will do its work we * can append the differences to the new append only file. */ //如果当前有子进程正在进行aof日志的重构(即扫描redis数据库,依据数据构建日志) //那么将当前命令的buff添加到server.aof_rewrite_buf_blocks内存中(该部分内存 //专门记录在重构aof期间redis处理的操作) if (server.aof_child_pid != -1) aofrewritebufferappend((unsigned char*)buf,sdslen(buf)); sdsfree(buf);}////////////////////////////////////////////////////////////////////////////////////////int servercron(struct aeeventloop *eventloop, long long id, void *clientdata) { int j; redis_notused(eventloop); redis_notused(id); redis_notused(clientdata); /* software watchdog: deliver the sigalrm that will reach the signal * handler if we don't return here fast enough. */ if (server.watchdog_period) watchdogschedulesignal(server.watchdog_period); /* we take a cached value of the unix time in the global state because * with virtual memory and aging there is to store the current time * in objects at every object access, and accuracy is not needed. * to access a global var is faster than calling time(null) */ //缓存系统时间... server.unixtime = time(null); server.mstime = mstime(); ... /* start a scheduled aof rewrite if this was requested by the user while * a bgsave was in progress. */ //开启aof日志重建的子进程(简化日志) //后台aof子进程通过扫描redis.db[16]数据,生成可重建当前数据库的命令, //并写入临时文件tmpfile if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && server.aof_rewrite_scheduled) { //aof rewriteappendonlyfilebackground(); } /* check if a background saving or aof rewrite in progress terminated. */ //后台aof进程结束:将在后台aof子进程构建aof日志期间redis执行的新命令 //(记录于server.aof_rewrite_buf_blocks)append 到后台子进程构建的tmpfile中 //最后将tmpfile重名为server.aof_filename 替换原有aof if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) { int statloc; pid_t pid; if ((pid = wait3(&statloc,wnohang,null)) != 0) { int exitcode = wexitstatus(statloc); int bysignal = 0; if (wifsignaled(statloc)) bysignal = wtermsig(statloc); if (pid == server.rdb_child_pid) { backgroundsavedonehandler(exitcode,bysignal); } else if (pid == server.aof_child_pid) { backgroundrewritedonehandler(exitcode,bysignal); } else { redislog(redis_warning, warning, detected child with unmatched pid: %ld, (long)pid); } updatedictresizepolicy(); } } else { /* if there is not a background saving/rewrite in progress check if * we have to save/rewrite now */ //没有后台子进程在跑,那么检查是否要开启一个aof或者rdb的子进程。。。 ... } /* if we postponed an aof buffer flush, let's try to do it every time the * cron function is called. */ //将server.aof_buf(缓存redis最近执行过的命名)flush到磁盘aof文件中 //flush的策略有如下: //每个操作,调用fync将命令持久化 //间隔1秒,调用fync将aof_buf持久化 //从不调用fync,由系统自行安排时机 if (server.aof_flush_postponed_start) flushappendonlyfile(0); ... server.cronloops++; return 1000/server.hz;}/* this is how rewriting of the append only file in background works: * * 1) the user calls bgrewriteaof * 2) redis calls this function, that forks(): * 2a) the child rewrite the append only file in a temp file. * 2b) the parent accumulates differences in server.aof_rewrite_buf. * 3) when the child finished '2a' exists. * 4) the parent will trap the exit code, if it's ok, will append the * data accumulated into server.aof_rewrite_buf into the temp file, and * finally will rename(2) the temp file in the actual file name. * the the new file is reopened as the new append only file. profit! */int rewriteappendonlyfilebackground(void) { pid_t childpid; long long start; if (server.aof_child_pid != -1) return redis_err; start = ustime(); if ((childpid = fork()) == 0) { char tmpfile[256]; /* child */ closelisteningsockets(0); redissetproctitle(redis-aof-rewrite); snprintf(tmpfile,256,temp-rewriteaof-bg-%d.aof, (int) getpid()); if (rewriteappendonlyfile(tmpfile) == redis_ok) { size_t private_dirty = zmalloc_get_private_dirty(); if (private_dirty) { redislog(redis_notice, aof rewrite: %zu mb of memory used by copy-on-write, private_dirty/(1024*1024)); } exitfromchild(0); } else { exitfromchild(1); } } else { /* parent */ server.stat_fork_time = ustime()-start; if (childpid == -1) { redislog(redis_warning, can't rewrite append only file in background: fork: %s, strerror(errno)); return redis_err; } redislog(redis_notice, background append only file rewriting started by pid %d,childpid); server.aof_rewrite_scheduled = 0; server.aof_rewrite_time_start = time(null); server.aof_child_pid = childpid; updatedictresizepolicy(); /* we set appendseldb to -1 in order to force the next call to the * feedappendonlyfile() to issue a select command, so the differences * accumulated by the parent into server.aof_rewrite_buf will start * with a select statement and it will be safe to merge. */ server.aof_selected_db = -1; replicationscriptcacheflush(); return redis_ok; } return redis_ok; /* unreached */}/* write a sequence of commands able to fully rebuild the dataset into * filename. used both by rewriteaof and bgrewriteaof. * * in order to minimize the number of commands needed in the rewritten * log redis uses variadic commands when possible, such as rpush, sadd * and zadd. however at max redis_aof_rewrite_items_per_cmd items per time * are inserted using a single command. */int rewriteappendonlyfile(char *filename) { dictiterator *di = null; dictentry *de; rio aof; file *fp; char tmpfile[256]; int j; long long now = mstime(); /* note that we have to use a different temp name here compared to the * one used by rewriteappendonlyfilebackground() function. */ snprintf(tmpfile,256,temp-rewriteaof-%d.aof, (int) getpid()); fp = fopen(tmpfile,w); if (!fp) { redislog(redis_warning, opening the temp file for aof rewrite in rewriteappendonlyfile(): %s, strerror(errno)); return redis_err; } rioinitwithfile(&aof,fp); if (server.aof_rewrite_incremental_fsync) riosetautosync(&aof,redis_aof_autosync_bytes); for (j = 0; j dict; if (dictsize(d) == 0) continue; di = dictgetsafeiterator(d); if (!di) { fclose(fp); return redis_err; } /* select the new db */ if (riowrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr; if (riowritebulklonglong(&aof,j) == 0) goto werr; /* iterate this db writing every entry */ while((de = dictnext(di)) != null) { sds keystr; robj key, *o; long long expiretime; keystr = dictgetkey(de); o = dictgetval(de); initstaticstringobject(key,keystr); expiretime = getexpire(db,&key); /* if this key is already expired skip it */ if (expiretime != -1 && expiretime type == redis_string) { /* emit a set command */ char cmd[]=*3\r\n$3\r\nset\r\n; if (riowrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; /* key and value */ if (riowritebulkobject(&aof,&key) == 0) goto werr; if (riowritebulkobject(&aof,o) == 0) goto werr; } else if (o->type == redis_list) { if (rewritelistobject(&aof,&key,o) == 0) goto werr; } else if (o->type == redis_set) { if (rewritesetobject(&aof,&key,o) == 0) goto werr; } else if (o->type == redis_zset) { if (rewritesortedsetobject(&aof,&key,o) == 0) goto werr; } else if (o->type == redis_hash) { if (rewritehashobject(&aof,&key,o) == 0) goto werr; } else { redispanic(unknown object type); } /* save the expire time */ if (expiretime != -1) { char cmd[]=*3\r\n$9\r\npexpireat\r\n; if (riowrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; if (riowritebulkobject(&aof,&key) == 0) goto werr; if (riowritebulklonglong(&aof,expiretime) == 0) goto werr; } } dictreleaseiterator(di); } /* make sure data will not remain on the os's output buffers */ fflush(fp); aof_fsync(fileno(fp)); fclose(fp); /* use rename to make sure the db file is changed atomically only * if the generate db file is ok. */ if (rename(tmpfile,filename) == -1) { redislog(redis_warning,error moving temp append only file on the final destination: %s, strerror(errno)); unlink(tmpfile); return redis_err; } redislog(redis_notice,sync append only file rewrite performed); return redis_ok;werr: fclose(fp); unlink(tmpfile); redislog(redis_warning,write error writing append only file on disk: %s, strerror(errno)); if (di) dictreleaseiterator(di); return redis_err;}/* write the append only file buffer on disk. * * since we are required to write the aof before replying to the client, * and the only way the client socket can get a write is entering when the * the event loop, we accumulate all the aof writes in a memory * buffer and write it on disk using this function just before entering * the event loop again. * * about the 'force' argument: * * when the fsync policy is set to 'everysec' we may delay the flush if there * is still an fsync() going on in the background thread, since for instance * on linux write(2) will be blocked by the background fsync anyway. * when this happens we remember that there is some aof buffer to be * flushed asap, and will try to do that in the servercron() function. * * however if force is set to 1 we'll write regardless of the background * fsync. */void flushappendonlyfile(int force) { ssize_t nwritten; int sync_in_progress = 0; if (sdslen(server.aof_buf) == 0) return; if (server.aof_fsync == aof_fsync_everysec) sync_in_progress = biopendingjobsoftype(redis_bio_aof_fsync) != 0; //判定是否该开始将server.aof_buff中缓存的命令flush到server.aof_fd文件的写缓冲中 if (server.aof_fsync == aof_fsync_everysec && !force) { /* with this append fsync policy we do background fsyncing. * if the fsync is still in progress we can try to delay * the write for a couple of seconds. */ if (sync_in_progress) { if (server.aof_flush_postponed_start == 0) { /* no previous write postponinig, remember that we are * postponing the flush and return. */ server.aof_flush_postponed_start = server.unixtime; return; } else if (server.unixtime - server.aof_flush_postponed_start < 2) { /* we were already waiting for fsync to finish, but for less * than two seconds this is still ok. postpone again. */ return; } /* otherwise fall trough, and go write since we can't wait * over two seconds. */ server.aof_delayed_fsync++; redislog(redis_notice,asynchronous aof fsync is taking too long (disk is busy?). writing the aof buffer without waiting for fsync to complete, this may slow down redis.); } } /* if you are following this code path, then we are going to write so * set reset the postponed flush sentinel to zero. */ server.aof_flush_postponed_start = 0; /* we want to perform a single write. this should be guaranteed atomic * at least if the filesystem we are writing is a real physical one. * while this will save us against the server being killed i don't think * there is much to do about the whole server stopping for power problems * or alike */ //将redis最近执行的一些命令(存于server.aof_buf)写入文件(server.aof_fd) //注意,写入文件并不能保证马上写入磁盘,因为这是带缓冲的写。关于何时将 //文件写缓冲中的命令fync到磁盘,这就要看用户的设置:(见下文) nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf)); if (nwritten != (signed)sdslen(server.aof_buf)) { /* ooops, we are in troubles. the best thing to do for now is * aborting instead of giving the illusion that everything is * working as expected. */ ... exit(1); } server.aof_current_size += nwritten; /* re-use aof buffer when it is small enough. the maximum comes from the * arena size of 4k minus some overhead (but is otherwise arbitrary). */ if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) server.aof_last_fsync)) { if (!sync_in_progress) aof_background_fsync(server.aof_fd); server.aof_last_fsync = server.unixtime; } //【3】 //else fd的写缓冲满后会由系统安排执行(听天由命)}
该用户其它信息

VIP推荐

免费发布信息,免费发布B2B信息网站平台 - 三六零分类信息网 沪ICP备09012988号-2
企业名录 Product