繼續(xù)學(xué)習(xí)redis源碼下的Data數(shù)據(jù)相關(guān)文件的代碼分析,今天我看的是一個(gè)叫aof的文件,這個(gè)字母是append ONLY file的簡(jiǎn)稱,意味只進(jìn)行追加文件操作。這里的文件追加記錄時(shí)為了記錄數(shù)據(jù)操作的改變記錄,用以異常情況的數(shù)據(jù)恢復(fù)的。類似于之前我說(shuō)的redo,undo日志
繼續(xù)學(xué)習(xí)redis源碼下的Data數(shù)據(jù)相關(guān)文件的代碼分析,今天我看的是一個(gè)叫aof的文件,這個(gè)字母是append ONLY file的簡(jiǎn)稱,意味只進(jìn)行追加文件操作。這里的文件追加記錄時(shí)為了記錄數(shù)據(jù)操作的改變記錄,用以異常情況的數(shù)據(jù)恢復(fù)的。類似于之前我說(shuō)的redo,undo日志的作用。我們都知道,redis作為一個(gè)內(nèi)存數(shù)據(jù)庫(kù),數(shù)據(jù)的每次操作改變是先放在內(nèi)存中,等到內(nèi)存數(shù)據(jù)滿了,在刷新到磁盤文件中,達(dá)到持久化的目的。所以aof的操作模式,也是采用了這樣的方式。這里引入了一個(gè)block塊的概念,其實(shí)就是一個(gè)緩沖區(qū)塊。關(guān)于塊的一些定義如下:
/* AOF的下面的一些代碼都用到了一個(gè)簡(jiǎn)單buffer緩存塊來(lái)進(jìn)行存儲(chǔ),存儲(chǔ)了數(shù)據(jù)的一些改變操作記錄,等到 緩沖中的達(dá)到一定的數(shù)據(jù)規(guī)模時(shí),在持久化地寫入到一個(gè)文件中,redis采用的方式是append追加的形式,這意味 每次追加都要調(diào)整存儲(chǔ)的塊的大小,但是不可能會(huì)有無(wú)限大小的塊空間,所以redis在這里引入了塊列表的概念, 設(shè)定死一個(gè)塊的大小,超過(guò)單位塊大小,存入另一個(gè)塊中,這里定義每個(gè)塊的大小為10M. */ #define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10) /* 10 MB per block */ /* 標(biāo)準(zhǔn)的aof文件讀寫塊 */ typedef struct aofrwblock { //當(dāng)前文件塊被使用了多少,空閑的大小 unsigned long used, free; //具體存儲(chǔ)內(nèi)容,大小10M char buf[AOF_RW_BUF_BLOCK_SIZE]; } aofrwblock;
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */ /* 在緩沖區(qū)中追加數(shù)據(jù),如果超出空間,會(huì)新申請(qǐng)一個(gè)緩沖塊 */ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { listNode *ln = listLast(server.aof_rewrite_buf_blocks); //定位到緩沖區(qū)的最后一塊,在最后一塊的位置上進(jìn)行追加寫操作 aofrwblock *block = ln ? ln->value : NULL; while(len) { /* If we already got at least an allocated block, try appending * at least some piece into it. */ if (block) { //如果當(dāng)前的緩沖塊的剩余空閑能支持len長(zhǎng)度的內(nèi)容時(shí),直接寫入 unsigned long thislen = (block->free < len) ? block->free : len; if (thislen) { /* The current block is not already full. */ memcpy(block->buf+block->used, s, thislen); block->used += thislen; block->free -= thislen; s += thislen; len -= thislen; } } if (len) { /* First block to allocate, or need another block. */ int numblocks; //如果不夠的話,需要新創(chuàng)建,進(jìn)行寫操作 block = zmalloc(sizeof(*block)); block->free = AOF_RW_BUF_BLOCK_SIZE; block->used = 0; //還要把緩沖塊追加到服務(wù)端的buffer列表中 listAddNodeTail(server.aof_rewrite_buf_blocks,block); /* Log every time we cross more 10 or 100 blocks, respectively * as a notice or warning. */ numblocks = listLength(server.aof_rewrite_buf_blocks); if (((numblocks+1) % 10) == 0) { int level = ((numblocks+1) % 100) == 0 ? REDIS_WARNING : REDIS_NOTICE; redisLog(level,"Background AOF buffer size: %lu MB", aofRewriteBufferSize()/(1024*1024)); } } } }
/* Write the append only file buffer on disk. * * Since we are required to write the AOF before replying to the client, * and the only way the client socket can get a write is entering when the * the event loop, we accumulate all the AOF writes in a memory * buffer and write it on disk using this function just before entering * the event loop again. * * About the 'force' argument: * * When the fsync policy is set to 'everysec' we may delay the flush if there * is still an fsync() going on in the background thread, since for instance * on Linux write(2) will be blocked by the background fsync anyway. * When this happens we remember that there is some aof buffer to be * flushed ASAP, and will try to do that in the serverCron() function. * * However if force is set to 1 we'll write regardless of the background * fsync. */ #define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */ /* 刷新緩存區(qū)的內(nèi)容到磁盤中 */ void flushAppendOnlyFile(int force) { ssize_t nwritten; int sync_in_progress = 0; mstime_t latency; if (sdslen(server.aof_buf) == 0) return; if (server.aof_fsync == AOF_FSYNC_EVERYSEC) sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0; if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) { /* With this append fsync policy we do background fsyncing. * If the fsync is still in progress we can try to delay * the write for a couple of seconds. */ if (sync_in_progress) { if (server.aof_flush_postponed_start == 0) { /* No previous write postponinig, remember that we are * postponing the flush and return. */ server.aof_flush_postponed_start = server.unixtime; return; } else if (server.unixtime - server.aof_flush_postponed_start < 2) { /* We were already waiting for fsync to finish, but for less * than two seconds this is still ok. Postpone again. */ return; } /* Otherwise fall trough, and go write since we can't wait * over two seconds. */ server.aof_delayed_fsync++; redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis."); } } /* We want to perform a single write. This should be guaranteed atomic * at least if the filesystem we are writing is a real physical one. * While this will save us against the server being killed I don't think * there is much to do about the whole server stopping for power problems * or alike */ //在進(jìn)行寫入操作的時(shí)候,還監(jiān)聽了延遲 latencyStartMonitor(latency); nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf)); latencyEndMonitor(latency); /* We want to capture different events for delayed writes: * when the delay happens with a pending fsync, or with a saving child * active, and when the above two conditions are missing. * We also use an additional event name to save all samples which is * useful for graphing / monitoring purposes. */ if (sync_in_progress) { latencyAddSampleIfNeeded("aof-write-pending-fsync",latency); } else if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) { latencyAddSampleIfNeeded("aof-write-active-child",latency); } else { latencyAddSampleIfNeeded("aof-write-alone",latency); } latencyAddSampleIfNeeded("aof-write",latency); /* We performed the write so reset the postponed flush sentinel to zero. */ server.aof_flush_postponed_start = 0; if (nwritten != (signed)sdslen(server.aof_buf)) { static time_t last_write_error_log = 0; int can_log = 0; /* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */ if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) { can_log = 1; last_write_error_log = server.unixtime; } /* Lof the AOF write error and record the error code. */ if (nwritten == -1) { if (can_log) { redisLog(REDIS_WARNING,"Error writing to the AOF file: %s", strerror(errno)); server.aof_last_write_errno = errno; } } else { if (can_log) { redisLog(REDIS_WARNING,"Short write while writing to " "the AOF file: (nwritten=%lld, " "expected=%lld)", (long long)nwritten, (long long)sdslen(server.aof_buf)); } if (ftruncate(server.aof_fd, server.aof_current_size) == -1) { if (can_log) { redisLog(REDIS_WARNING, "Could not remove short write " "from the append-only file. Redis may refuse " "to load the AOF the next time it starts. " "ftruncate: %s", strerror(errno)); } } else { /* If the ftrunacate() succeeded we can set nwritten to * -1 since there is no longer partial data into the AOF. */ nwritten = -1; } server.aof_last_write_errno = ENOSPC; } /* Handle the AOF write error. */ if (server.aof_fsync == AOF_FSYNC_ALWAYS) { /* We can't recover when the fsync policy is ALWAYS since the * reply for the client is already in the output buffers, and we * have the contract with the user that on acknowledged write data * is synched on disk. */ redisLog(REDIS_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting..."); exit(1); } else { /* Recover from failed write leaving data into the buffer. However * set an error to stop accepting writes as long as the error * condition is not cleared. */ server.aof_last_write_status = REDIS_ERR; /* Trim the sds buffer if there was a partial write, and there * was no way to undo it with ftruncate(2). */ if (nwritten > 0) { server.aof_current_size += nwritten; sdsrange(server.aof_buf,nwritten,-1); } return; /* We'll try again on the next call... */ } } else { /* Successful write(2). If AOF was in error state, restore the * OK state and log the event. */ if (server.aof_last_write_status == REDIS_ERR) { redisLog(REDIS_WARNING, "AOF write error looks solved, Redis can write again."); server.aof_last_write_status = REDIS_OK; } } server.aof_current_size += nwritten; /* Re-use AOF buffer when it is small enough. The maximum comes from the * arena size of 4k minus some overhead (but is otherwise arbitrary). */ if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) { sdsclear(server.aof_buf); } else { sdsfree(server.aof_buf); server.aof_buf = sdsempty(); } /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are * children doing I/O in the background. */ if (server.aof_no_fsync_on_rewrite && (server.aof_child_pid != -1 || server.rdb_child_pid != -1)) return; /* Perform the fsync if needed. */ if (server.aof_fsync == AOF_FSYNC_ALWAYS) { /* aof_fsync is defined as fdatasync() for Linux in order to avoid * flushing metadata. */ latencyStartMonitor(latency); aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */ latencyEndMonitor(latency); latencyAddSampleIfNeeded("aof-fsync-always",latency); server.aof_last_fsync = server.unixtime; } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync)) { if (!sync_in_progress) aof_background_fsync(server.aof_fd); server.aof_last_fsync = server.unixtime; } }
/* Write a sequence of commands able to fully rebuild the dataset into * "filename". Used both by REWRITEAOF and BGREWRITEAOF. * * In order to minimize the number of commands needed in the rewritten * log Redis uses variadic commands when possible, such as RPUSH, SADD * and ZADD. However at max REDIS_AOF_REWRITE_ITEMS_PER_CMD items per time * are inserted using a single command. */ /* 將數(shù)據(jù)庫(kù)的內(nèi)容按照鍵值,再次完全重寫入文件中 */ int rewriteAppendOnlyFile(char *filename) { dictIterator *di = NULL; dictEntry *de; rio aof; FILE *fp; char tmpfile[256]; int j; long long now = mstime(); /* Note that we have to use a different temp name here compared to the * one used by rewriteAppendOnlyFileBackground() function. */ snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid()); fp = fopen(tmpfile,"w"); if (!fp) { redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno)); return REDIS_ERR; } rioInitWithFile(&aof,fp); if (server.aof_rewrite_incremental_fsync) rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES); for (j = 0; j < server.dbnum; j++) { char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; redisDb *db = server.db+j; dict *d = db->dict; if (dictSize(d) == 0) continue; di = dictGetSafeIterator(d); if (!di) { fclose(fp); return REDIS_ERR; } /* SELECT the new DB */ if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr; if (rioWriteBulkLongLong(&aof,j) == 0) goto werr; /* Iterate this DB writing every entry */ //遍歷數(shù)據(jù)庫(kù)中的每條記錄,進(jìn)行日志記錄 while((de = dictNext(di)) != NULL) { sds keystr; robj key, *o; long long expiretime; keystr = dictGetKey(de); o = dictGetVal(de); initStaticStringObject(key,keystr); expiretime = getExpire(db,&key); /* If this key is already expired skip it */ if (expiretime != -1 && expiretime < now) continue; /* Save the key and associated value */ if (o->type == REDIS_STRING) { /* Emit a SET command */ char cmd[]="*3\r\n$3\r\nSET\r\n"; if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; /* Key and value */ if (rioWriteBulkObject(&aof,&key) == 0) goto werr; if (rioWriteBulkObject(&aof,o) == 0) goto werr; } else if (o->type == REDIS_LIST) { if (rewriteListObject(&aof,&key,o) == 0) goto werr; } else if (o->type == REDIS_SET) { if (rewriteSetObject(&aof,&key,o) == 0) goto werr; } else if (o->type == REDIS_ZSET) { if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr; } else if (o->type == REDIS_HASH) { if (rewriteHashObject(&aof,&key,o) == 0) goto werr; } else { redisPanic("Unknown object type"); } /* Save the expire time */ if (expiretime != -1) { char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; if (rioWriteBulkObject(&aof,&key) == 0) goto werr; if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr; } } dictReleaseIterator(di); } /* Make sure data will not remain on the OS's output buffers */ if (fflush(fp) == EOF) goto werr; if (fsync(fileno(fp)) == -1) goto werr; if (fclose(fp) == EOF) goto werr; /* Use RENAME to make sure the DB file is changed atomically only * if the generate DB file is ok. */ if (rename(tmpfile,filename) == -1) { redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno)); unlink(tmpfile); return REDIS_ERR; } redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed"); return REDIS_OK; werr: fclose(fp); unlink(tmpfile); redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno)); if (di) dictReleaseIterator(di); return REDIS_ERR; }
/* This is how rewriting of the append only file in background works: * * 1) The user calls BGREWRITEAOF * 2) Redis calls this function, that forks(): * 2a) the child rewrite the append only file in a temp file. * 2b) the parent accumulates differences in server.aof_rewrite_buf. * 3) When the child finished '2a' exists. * 4) The parent will trap the exit code, if it's OK, will append the * data accumulated into server.aof_rewrite_buf into the temp file, and * finally will rename(2) the temp file in the actual file name. * The the new file is reopened as the new append only file. Profit! */ /* 后臺(tái)進(jìn)行AOF數(shù)據(jù)文件寫入操作 */ int rewriteAppendOnlyFileBackground(void)
/* aof.c 中的API */ void aofRewriteBufferReset(void) /* 釋放server中舊的buffer,并創(chuàng)建一份新的buffer */ unsigned long aofRewriteBufferSize(void) /* 返回當(dāng)前AOF的buffer的總大小 */ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) /* 在緩沖區(qū)中追加數(shù)據(jù),如果超出空間,會(huì)新申請(qǐng)一個(gè)緩沖塊 */ ssize_t aofRewriteBufferWrite(int fd) /* 將保存內(nèi)存中的buffer內(nèi)容寫入到文件中,也是分塊分塊的寫入 */ void aof_background_fsync(int fd) /* 開啟后臺(tái)線程進(jìn)行文件同步操作 */ void stopAppendOnly(void) /* 停止追加數(shù)據(jù)操作,這里用的是一個(gè)命令模式 */ int startAppendOnly(void) /* 開啟追加模式 */ void flushAppendOnlyFile(int force) /* 刷新緩存區(qū)的內(nèi)容到磁盤中 */ sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) /* 根據(jù)輸入的字符串,進(jìn)行參數(shù)包裝,再次
聲明:本網(wǎng)頁(yè)內(nèi)容旨在傳播知識(shí),若有侵權(quán)等問(wèn)題請(qǐng)及時(shí)與本網(wǎng)聯(lián)系,我們將在第一時(shí)間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com