在线观看不卡亚洲电影_亚洲妓女99综合网_91青青青亚洲娱乐在线观看_日韩无码高清综合久久

鍍金池/ 教程/ 大數(shù)據(jù)/ 主從復(fù)制
Redis 數(shù)據(jù)淘汰機(jī)制
積分排行榜
小剖 Memcache
Redis 數(shù)據(jù)結(jié)構(gòu) intset
分布式鎖
從哪里開始讀起,怎么讀
Redis 數(shù)據(jù)結(jié)構(gòu) dict
不在浮沙筑高臺(tái)
Redis 集群(上)
Redis 監(jiān)視器
源碼閱讀工具
Redis 日志和斷言
內(nèi)存數(shù)據(jù)管理
Redis 數(shù)據(jù)結(jié)構(gòu)綜述
源碼日志
Web 服務(wù)器存儲(chǔ) session
消息中間件
Redis 與 Lua 腳本
什么樣的源代碼適合閱讀
Redis 數(shù)據(jù)結(jié)構(gòu) sds
Memcached slab 分配策略
訂閱發(fā)布機(jī)制
Redis 是如何提供服務(wù)的
Redis 事務(wù)機(jī)制
Redis 集群(下)
主從復(fù)制
Redis 應(yīng)用
RDB 持久化策略
Redis 數(shù)據(jù)遷移
Redis 事件驅(qū)動(dòng)詳解
初探 Redis
Redis 與 Memcache
AOF 持久化策略
Redis 數(shù)據(jù)結(jié)構(gòu) redisOb
作者簡介
Redis 數(shù)據(jù)結(jié)構(gòu) ziplist
Redis 數(shù)據(jù)結(jié)構(gòu) skiplist
Redis 哨兵機(jī)制

主從復(fù)制

概述

Redis 支持 master-slave(主從)模式,一個(gè) redis server 可以設(shè)置為另一個(gè) redis server 的主機(jī)(從機(jī)),從機(jī)定期從主機(jī)拿數(shù)據(jù)。特殊的,一個(gè)從機(jī)同樣可以設(shè)置為一個(gè) redis server 的主機(jī),這樣一來 master-slave 的分布看起來就是一個(gè)有向無環(huán)圖 DAG,如此形成 redis server 集群,無論是主機(jī)還是從機(jī)都是 redis server,都可以提供服務(wù)。

http://wiki.jikexueyuan.com/project/redis/images/redis22.png" alt="" />

在配置后,主機(jī)可負(fù)責(zé)讀寫服務(wù),從機(jī)只負(fù)責(zé)讀。Redis 提高這種配置方式,為的是讓其支持?jǐn)?shù)據(jù)的弱一致性,即最終一致性。在業(yè)務(wù)中,選擇強(qiáng)一致性還是弱一致性,應(yīng)該取決于具體的業(yè)務(wù)需求,比如微博里的 timeline,可以使用弱一致性模型;比如支付寶的支付賬單,要選用強(qiáng)一致性模型。

積壓空間

binlog 是在 mysql 中的一種日志類型,它記錄了所有數(shù)據(jù)庫自備份一來的所有更新操作或潛在的更新操作,描述了數(shù)據(jù)的更改。因?yàn)?binlog 只記錄了數(shù)據(jù)的更新,所以適合用來做實(shí)時(shí)備份和主從復(fù)制。同樣,Redis 在主從復(fù)制上用的就是一種類似 binlog 的日志。在《AOF 持久化策略》中,介紹了更新緩存的概念,舉一個(gè)例子:客戶端發(fā)來命令:set name Jhon,這一數(shù)據(jù)更新被記錄為:*3/r/n$3/r/nSET/r/n$4/r/nname/r/n$3/r/nJhon/r/n,并存儲(chǔ)在更新緩存中。

同樣,在主從連接中,也有更新緩存的概念。只是兩者的用途不一樣,前者被寫入本地,后者被寫入從機(jī),這里我們把它成為積壓空間。更新緩存存儲(chǔ)在 server.repl_backlog,Redis 將其作為一個(gè)環(huán)形空間來處理,這樣做節(jié)省了空間,避免內(nèi)存再分配的情況。

struct redisServer {
    ......
    /* Replication (master) */
    // 最近一次使用(訪問)的數(shù)據(jù)集
    int slaveseldb; /* Last SELECTed DB in replication output */
    // 全局的數(shù)據(jù)同步偏移量
    long long master_repl_offset; /* Global replication offset */
    // 主從連接心跳頻率
    int repl_ping_slave_period; /* Master pings the slave every N seconds */
    // 積壓空間指針
    char *repl_backlog; /* Replication backlog for partial syncs */
    // 積壓空間大小
    long long repl_backlog_size; /* Backlog circular buffer size */
    // 積壓空間中寫入的新數(shù)據(jù)的大小
    long long repl_backlog_histlen; /* Backlog actual data length */
    // 下一次向積壓空間寫入數(shù)據(jù)的起始位置
    long long repl_backlog_idx; /* Backlog circular buffer current offset */
    // 積壓數(shù)據(jù)的起始位置的所對(duì)應(yīng)的全局主從復(fù)制偏移量
    long long repl_backlog_off; /* Replication offset of first byte in the
    backlog buffer. */
    // 積壓空間有效時(shí)間
    time_t repl_backlog_time_limit; /* Time without slaves after the backlog
    gets released. */
    ......
}

積壓空間中的數(shù)據(jù)變更記錄是什么時(shí)候被寫入的?在執(zhí)行一個(gè) Redis 命令的時(shí)候,如果存在數(shù)據(jù)的修改(寫),那么就會(huì)把變更記錄傳播。Redis 源碼中是這么實(shí)現(xiàn)的:call()->propagate()->replicationFeedSlaves()。

需注意,命令真正執(zhí)行的地方在 call() 中,call() 如果發(fā)現(xiàn)數(shù)據(jù)被修改(dirty),則傳播 propagrate(),replicationFeedSlaves() 將修改記錄寫入積壓空間和所有已連接的從機(jī)。

同樣,在《AOF 持久化策略》提到的,propagrate() 也會(huì)將數(shù)據(jù)的修改記錄寫入到更新緩存中。

這里可能會(huì)有疑問:為什么把數(shù)據(jù)添加入積壓空間,又把數(shù)據(jù)分發(fā)給所有的從機(jī)?為什么不僅僅將數(shù)據(jù)分發(fā)給所有從機(jī)呢?

因?yàn)橛幸恍臋C(jī)會(huì)因特殊情況,與主機(jī)斷開連接。從機(jī)斷開前有暫存主機(jī)的狀態(tài)信息,因此這些斷開的從機(jī)就沒有及時(shí)收到更新的數(shù)據(jù)。Redis 為了讓斷開的從機(jī)在下次連接后能夠獲取更新數(shù)據(jù),將更新數(shù)據(jù)加入了積壓空間。從replicationFeedSlaves() 實(shí)現(xiàn)來看,在線的 slave 能馬上收到數(shù)據(jù)更新記錄;因某些原因暫時(shí)斷開連接的slave,需要從積壓空間中找回?cái)嚅_期間的數(shù)據(jù)更新記錄。如果斷開的時(shí)間足夠長,master 會(huì)拒絕 slave 的部分 同步請(qǐng)求,從而slave 只能進(jìn)行全同步。

下面是更細(xì)積壓空間的核心代碼注釋:首先,在命令執(zhí)行函數(shù)中,如果發(fā)現(xiàn)是涉及寫的命令,會(huì)將修改傳播,即調(diào)用 propagrate()

// call() 函數(shù)是執(zhí)行命令的核心函數(shù),真正執(zhí)行命令的地方
/* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) {
    ......
    /* Call the command. */
    c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);
    redisOpArrayInit(&server.also_propagate);
    // 臟數(shù)據(jù)標(biāo)記,數(shù)據(jù)是否被修改
    dirty = server.dirty;
    // 執(zhí)行命令對(duì)應(yīng)的函數(shù)
    c->cmd->proc(c);
    dirty = server.dirty-dirty;
    duration = ustime()-start;
    ......
    // 將客戶端請(qǐng)求的數(shù)據(jù)修改記錄傳播給AOF 和從機(jī)
    /* Propagate the command into the AOF and replication link */
    if (flags & REDIS_CALL_PROPAGATE) {
        int flags = REDIS_PROPAGATE_NONE;
        // 強(qiáng)制主從復(fù)制
    if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;
        // 強(qiáng)制AOF 持久化
    if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;
        // 數(shù)據(jù)被修改
    if (dirty)
        flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
        // 傳播數(shù)據(jù)修改記錄
    if (flags != REDIS_PROPAGATE_NONE)
        propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
    }
    ......
}

主要向兩個(gè)方向傳播修改記錄,一個(gè)是 AOF 持久化,另一個(gè)則是主從復(fù)制。

// 向AOF 和從機(jī)發(fā)布數(shù)據(jù)更新
/* Propagate the specified command (in the context of the specified database id)
* to AOF and Slaves.
**
flags are an xor between:
* + REDIS_PROPAGATE_NONE (no propagation of command at all)
* + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
* + REDIS_PROPAGATE_REPL (propagate into the replication link)
*/
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
             int flags)
{
    // AOF 策略需要打開,且設(shè)置AOF 傳播標(biāo)記,將更新發(fā)布給本地文件
    if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
        // 設(shè)置了從機(jī)傳播標(biāo)記,將更新發(fā)布給從機(jī)
    if (flags & REDIS_PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

向從機(jī)傳播更新記錄的時(shí)候,Redis 主機(jī)會(huì)向所有的從機(jī)發(fā)送變更記錄,同時(shí)也會(huì)寫入到積壓空間,方便已經(jīng)斷開的從機(jī),再下一次重新連接的時(shí)候,拷貝數(shù)據(jù)。

// 向積壓空間和從機(jī)發(fā)送數(shù)據(jù)
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
    listNode *ln;
    listIter li;
    int j, len;
    char llstr[REDIS_LONGSTR_SIZE];
    // 沒有積壓數(shù)據(jù)且沒有從機(jī),直接退出
    /* If there aren't slaves, and there is no backlog buffer to populate,
    * we can return ASAP. */
    if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
        /* We can't have slaves attached and no backlog. */
        redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
        /* Send SELECT command to every slave if needed. */
    if (server.slaveseldb != dictid) {
        robj *selectcmd;
        // 小于等于10 的可以用共享對(duì)象
        /* For a few DBs we have pre-computed SELECT command. */
    if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
        selectcmd = shared.select[dictid];
    } else {
        // 不能使用共享對(duì)象,生成SELECT 命令對(duì)應(yīng)的redis 對(duì)象
        int dictid_len;
        dictid_len = ll2string(llstr,sizeof(llstr),dictid);
        selectcmd = createObject(REDIS_STRING,
        sdscatprintf(sdsempty(),
        "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
        dictid_len, llstr));
    }
    // 這里可能會(huì)有疑問:為什么把數(shù)據(jù)添加入積壓空間,又把數(shù)據(jù)分發(fā)給所有的從機(jī)?
    // 為什么不僅僅將數(shù)據(jù)分發(fā)給所有從機(jī)呢?
    // 因?yàn)橛幸恍臋C(jī)會(huì)因特殊情況,與主機(jī)斷開連接。從機(jī)斷開前有暫存
    // 主機(jī)的狀態(tài)信息,因此這些斷開的從機(jī)就沒有及時(shí)收到更新的數(shù)據(jù)。redis 為了讓
    // 斷開的從機(jī)在下次連接后能夠獲取更新數(shù)據(jù),將更新數(shù)據(jù)加入了積壓空間。
    // 將SELECT 命令對(duì)應(yīng)的redis 對(duì)象數(shù)據(jù)添加到積壓空間
    /* Add the SELECT command into the backlog. */
    if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
        // 將數(shù)據(jù)分發(fā)所有的從機(jī)
        /* Send it to slaves. */
        listRewind(slaves,&li);
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;
        addReply(slave,selectcmd);
    }
    // 銷毀對(duì)象
    if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)
        decrRefCount(selectcmd);
    }
    // 更新最近一次使用(訪問)的數(shù)據(jù)集
    server.slaveseldb = dictid;
    // 將命令寫入積壓空間
    /* Write the command to the replication backlog if any. */
    if (server.repl_backlog) {
        char aux[REDIS_LONGSTR_SIZE+3];
        // 命令個(gè)數(shù)
        /* Add the multi bulk reply length. */
        aux[0] = '*';
        len = ll2string(aux+1,sizeof(aux)-1,argc);
        aux[len+1] = '\r';
        aux[len+2] = '\n';
        feedReplicationBacklog(aux,len+3);
        // 逐個(gè)命令寫入
    for (j = 0; j < argc; j++) {
        long objlen = stringObjectLen(argv[j]);
        /* We need to feed the buffer with the object as a bulk reply
        * not just as a plain string, so create the $..CRLF payload len
        * ad add the final CRLF */
        aux[0] = '$';
        len = ll2string(aux+1,sizeof(aux)-1,objlen);
        aux[len+1] = '\r';
        aux[len+2] = '\n';
        /* 每個(gè)命令格式如下:
        $3
        *3
        SET
        *4
        NAME
        *4
        Jhon*/
        // 命令長度
        feedReplicationBacklog(aux,len+3);
        // 命令
        feedReplicationBacklogWithObject(argv[j]);
        // 換行
        feedReplicationBacklog(aux+len+1,2);
        }
    }
    // 立即給每一個(gè)從機(jī)發(fā)送命令
    /* Write the command to every slave. */
    listRewind(slaves,&li);
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;
        // 如果從機(jī)要求全同步,則不對(duì)此從機(jī)發(fā)送數(shù)據(jù)
        /* Don't feed slaves that are still waiting for BGSAVE to start */
    if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
        /* Feed slaves that are waiting for the initial SYNC (so these commands
        * are queued in the output buffer until the initial SYNC completes),
        * or are already in sync with the master. */
        // 向從機(jī)命令的長度
        /* Add the multi bulk length. */
        addReplyMultiBulkLen(slave,argc);
    }
}

主從數(shù)據(jù)同步機(jī)制概述

Redis 主從同步有兩種方式(或者所兩個(gè)階段):全同步和部分同步。

主從剛剛連接的時(shí)候,進(jìn)行全同步;全同步結(jié)束后,進(jìn)行部分同步。如果有需要,slave 在任何時(shí)候都可以發(fā)起全同步。Redis 策略是,無論如何,首先會(huì)嘗試進(jìn)行部分同步,如不成功,要求從機(jī)進(jìn)行全同步,并啟動(dòng)BGSAVE??BGSAVE 結(jié)束后,傳輸 RDB 文件;如果成功,允許從機(jī)進(jìn)行部分同步,并傳輸積壓空間中的數(shù)據(jù)。

http://wiki.jikexueyuan.com/project/redis/images/r1.png" alt="" />

如需設(shè)置 slave,master 需要向slave 發(fā)送 SLAVEOF hostname port,從機(jī)接收到后會(huì)自動(dòng)連接主機(jī),注冊(cè)相應(yīng)讀寫事件(syncWithMaster())。

 // 修改主機(jī)
void slaveofCommand(redisClient *c) {
    if (!strcasecmp(c->argv[1]->ptr,"no") &&
        !strcasecmp(c->argv[2]->ptr,"one")) {
        // slaveof no one 斷開主機(jī)連接
    if (server.masterhost) {
        replicationUnsetMaster();
        redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
    }
    } else {
        long port;
    if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))
        return;
        // 可能已經(jīng)連接需要連接的主機(jī)
        /* Check if we are already attached to the specified slave */
    if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
        && server.masterport == port) {
        redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
        return;
    }
    // 斷開之前連接主機(jī)的連接,連接新的。replicationSetMaster() 并不會(huì)真正連接主機(jī),只是修改/* There was no previous master or the user specified a different one,
    * we can continue. */
    replicationSetMaster(c->argv[1]->ptr, port);
    redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
    server.masterhost, server.masterport);
        }
        addReply(c,shared.ok);
    }
    // 設(shè)置新主機(jī)
    /* Set replication to the specified master address and port. */
void replicationSetMaster(char *ip, int port) {
    sdsfree(server.masterhost);
    server.masterhost = sdsdup(ip);
    server.masterport = port;
    // 斷開之前主機(jī)的連接
    if (server.master) freeClient(server.master);
        disconnectSlaves(); /* Force our slaves to resync with us as well. */
        // 取消緩存主機(jī)
        replicationDiscardCachedMaster(); /* Don't try a PSYNC. */
        // 釋放積壓空間
        freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
        // cancelReplicationHandshake() 嘗試斷開數(shù)據(jù)傳輸和主機(jī)連接
        cancelReplicationHandshake();
        server.repl_state = REDIS_REPL_CONNECT;
        server.master_repl_offset = 0;
    }
    // 管理主從連接的定時(shí)程序定時(shí)程序,每秒執(zhí)行一次
    // 在serverCorn() 中調(diào)用
    /* --------------------------- REPLICATION CRON ----------------------------- */
    /* Replication cron funciton, called 1 time per second. */
void replicationCron(void) {
    ......
    // 如果需要(EDIS_REPL_CONNECT),嘗試連接主機(jī),真正連接主機(jī)的操作在這里
    /* Check if we should connect to a MASTER */
    if (server.repl_state == REDIS_REPL_CONNECT) {
        redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d",
        server.masterhost, server.masterport);
    if (connectWithMaster() == REDIS_OK) {
        redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
        }
    }
    ......
}

全同步

無論如何,Redis 首先會(huì)嘗試部分同步,如果失敗才嘗試全同步。而剛剛建立連接的 master-slave 需要全同步。

從機(jī)連接主機(jī)后,會(huì)主動(dòng)發(fā)起 PSYNC 命令,從機(jī)會(huì)提供 master_runid 和offset,主機(jī)驗(yàn)證 master_runid 和 offset 是否有效?master_runid 相當(dāng)于主機(jī)身份驗(yàn)證碼,用來驗(yàn)證從機(jī)上一次連接的主機(jī),offset 是全局積壓空間數(shù)據(jù)的偏移量。

驗(yàn)證未通過則,則進(jìn)行全同步:主機(jī)返回+FULLRESYNC master_runid offset(從機(jī)接收并記錄 master_runid 和 offset,并準(zhǔn)備接收RDB 文件)接著啟動(dòng) BGSAVE 生成 RDB 文件,BGSAVE 結(jié)束后,向從機(jī)傳輸,從而完成全同步。

主機(jī)和從機(jī)之間的交互圖如下:

http://wiki.jikexueyuan.com/project/redis/images/r2.png" alt="" />

// 連接主機(jī)connectWithMaster() 的時(shí)候,會(huì)被注冊(cè)為回調(diào)函數(shù)
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    char tmpfile[256], *err;
    int dfd, maxtries = 5;
    int sockerr = 0, psync_result;
    socklen_t errlen = sizeof(sockerr);
    ......
    // 這里嘗試向主機(jī)請(qǐng)求部分同步,主機(jī)會(huì)回復(fù)以拒絕或接受請(qǐng)求。如果拒絕部分同步,
    // 會(huì)返回+FULLRESYNC master_runid offset
    // 從機(jī)接收后準(zhǔn)備進(jìn)行全同步
    psync_result = slaveTryPartialResynchronization(fd);
    if (psync_result == PSYNC_CONTINUE) {
        redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a "
        "Partial Resynchronization.");
        return;
    }
    // 執(zhí)行全同步
    /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
    * and the server.repl_master_runid and repl_master_initial_offset are
    * already populated. */
    // 未知結(jié)果,進(jìn)行出錯(cuò)處理
    if (psync_result == PSYNC_NOT_SUPPORTED) {
        redisLog(REDIS_NOTICE,"Retrying with SYNC...");
    if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
            redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
                strerror(errno));
                goto error;
        }
    }
    // 為什么要嘗試5 次???
    /* Prepare a suitable temp file for bulk transfer */
    while(maxtries--) {
        snprintf(tmpfile,256,
        "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
        dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
    if (dfd != -1) break;
        sleep(1);
    }
    if (dfd == -1) {
        redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> "
            "SLAVE synchronization: %s",strerror(errno));
                goto error;
        }
    // 注冊(cè)讀事件,回調(diào)函數(shù)readSyncBulkPayload(), 準(zhǔn)備讀RDB 文件
    /* Setup the non blocking download of the bulk file. */
    if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
        == AE_ERR)
    {
        redisLog(REDIS_WARNING,
        "Can't create readable event for SYNC: %s (fd=%d)",
        strerror(errno),fd);
        goto error;
    }
    // 設(shè)置傳輸RDB 文件數(shù)據(jù)的選項(xiàng)
    // 狀態(tài)
    server.repl_state = REDIS_REPL_TRANSFER;
    // RDB 文件大小
    server.repl_transfer_size = -1;
    // 已經(jīng)傳輸?shù)拇笮?    server.repl_transfer_read = 0;
    // 上一次同步的偏移,為的是定時(shí)寫入磁盤
    server.repl_transfer_last_fsync_off = 0;
    // 本地RDB 文件套接字
    server.repl_transfer_fd = dfd;
    // 上一次同步IO 時(shí)間
    server.repl_transfer_lastio = server.unixtime;
    // 臨時(shí)文件名
    server.repl_transfer_tmpfile = zstrdup(tmpfile);
    return;
error:
    close(fd);
    server.repl_transfer_s = -1;
    server.repl_state = REDIS_REPL_CONNECT;
    return;
}

全同步請(qǐng)求的數(shù)據(jù)是RDB 數(shù)據(jù)文件和積壓空間中的數(shù)據(jù)。關(guān)于 RDB 數(shù)據(jù)文件,請(qǐng)參見《RDB 持久化策略》。如果沒有后臺(tái)持久化 BGSAVE 進(jìn)程,那么 BGSVAE 會(huì)被觸發(fā),否則所有請(qǐng)求全同步的 slave 都會(huì)被標(biāo)記為等待 BGSAVE 結(jié)束。BGSAVE 結(jié)束后,master 會(huì)馬上向所有的從機(jī)發(fā)送 RDB 文件。

下面 syncCommand() 摘取全同步的部分:

// 主機(jī)SYNC 和PSYNC 命令處理函數(shù),會(huì)嘗試進(jìn)行部分同步和全同步
/* SYNC ad PSYNC command implemenation. */
void syncCommand(redisClient *c) {
    ......
    // 主機(jī)嘗試部分同步,失敗的話向從機(jī)發(fā)送+FULLRESYNC master_runid offset,
    // 接著啟動(dòng)BGSAVE
    // 執(zhí)行全同步:
    /* Full resynchronization. */
    server.stat_sync_full++;
    /* Here we need to check if there is a background saving operation
    * in progress, or if it is required to start one */
    if (server.rdb_child_pid != -1) {
        /* 存在BGSAVE 后臺(tái)進(jìn)程。
        1. 如果master 現(xiàn)有所連接的所有從機(jī)slaves 當(dāng)中有存在
        REDIS_REPL_WAIT_BGSAVE_END 的從機(jī),那么將從機(jī)c 設(shè)置為
        REDIS_REPL_WAIT_BGSAVE_END;
        2. 否則,設(shè)置為REDIS_REPL_WAIT_BGSAVE_START*/
        /* Ok a background save is in progress. Let's check if it is a good
        * one for replication, i.e. if there is another slave that is
        * registering differences since the server forked to save */
        redisClient *slave;
        listNode *ln;
        listIter li;
        // 檢測是否已經(jīng)有從機(jī)申請(qǐng)全同步
        listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        slave = ln->value;
    if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
        }
    if (ln) {
        // 存在狀態(tài)為REDIS_REPL_WAIT_BGSAVE_END 的從機(jī)slave,
        // 就將此從機(jī)c 狀態(tài)設(shè)置為REDIS_REPL_WAIT_BGSAVE_END,
        // 從而在BGSAVE 進(jìn)程結(jié)束后,可以發(fā)送RDB 文件,
        // 同時(shí)將從機(jī)slave 中的更新復(fù)制到此從機(jī)c。
        /* Perfect, the server is already registering differences for
        * another slave. Set the right state, and copy the buffer. */
        // 將其他從機(jī)上的待回復(fù)的緩存復(fù)制到從機(jī)c
        copyClientOutputBuffer(c,slave);
        // 修改從機(jī)c 狀態(tài)為「等待BGSAVE 進(jìn)程結(jié)束」
        c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
        redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
    } else {
        // 不存在狀態(tài)為REDIS_REPL_WAIT_BGSAVE_END 的從機(jī),就將此從機(jī)c 狀態(tài)設(shè)置為
        // REDIS_REPL_WAIT_BGSAVE_START,即等待新的BGSAVE 進(jìn)程的開啟。
        // 修改狀態(tài)為「等待BGSAVE 進(jìn)程開始」
        /* No way, we need to wait for the next BGSAVE in order to
        * register differences */
        c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
        redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
    }
    } else {
        // 不存在BGSAVE 后臺(tái)進(jìn)程,啟動(dòng)一個(gè)新的BGSAVE 進(jìn)程
        * Ok we don't have a BGSAVE in progress, let's start one */
        redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
    if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
        redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
        addReplyError(c,"Unable to perform background save");
        return;
    }
    // 將此從機(jī)c 狀態(tài)設(shè)置為REDIS_REPL_WAIT_BGSAVE_END,從而在BGSAVE
    // 進(jìn)程結(jié)束后,可以發(fā)送RDB 文件,同時(shí)將從機(jī)slave 中的更新復(fù)制到此從機(jī)c。
    c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
    // 清理腳本緩存???
    /* Flush the script cache for the new slave. */
    replicationScriptCacheFlush();
    }
    if (server.repl_disable_tcp_nodelay)
        anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
        c->repldbfd = -1;
        c->flags |= REDIS_SLAVE;
        server.slaveseldb = -1; /* Force to re-emit the SELECT command. */
        listAddNodeTail(server.slaves,c);
    if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
        createReplicationBacklog();
    return;
}

主機(jī)執(zhí)行完 BGSAVE 后,會(huì)將 RDB 文件發(fā)送給從機(jī)。

// BGSAVE 結(jié)束后,會(huì)調(diào)用
/* A background saving child (BGSAVE) terminated its work. Handle this. */
void backgroundSaveDoneHandler(int exitcode, int bysignal) {
    // 其他操作
    ......
    // 可能從機(jī)正在等待BGSAVE 進(jìn)程的終止
    /* Possibly there are slaves waiting for a BGSAVE in order to be served
    * (the first stage of SYNC is a bulk transfer of dump.rdb) */
    updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
    }
    // 當(dāng)RDB 持久化(backgroundSaveDoneHandler()) 結(jié)束后,會(huì)調(diào)用此函數(shù)
    // RDB 文件就緒,給所有的從機(jī)發(fā)送RDB 文件
    /* This function is called at the end of every background saving.
    * The argument bgsaveerr is REDIS_OK if the background saving succeeded
    * otherwise REDIS_ERR is passed to the function.
    **
    The goal of this function is to handle slaves waiting for a successful
    * background saving in order to perform non-blocking synchronization. */
void updateSlavesWaitingBgsave(int bgsaveerr) {
    listNode *ln;
    int startbgsave = 0;
    listIter li;
    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;
        // 等待BGSAVE 開始。調(diào)整狀態(tài)為等待下一次BGSAVE 進(jìn)程的結(jié)束
    if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
        startbgsave = 1;
        slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
    // 等待BGSAVE 結(jié)束。準(zhǔn)備向slave 發(fā)送RDB 文件
    } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
        struct redis_stat buf;
        // 如果RDB 持久化失敗, bgsaveerr 會(huì)被設(shè)置為REDIS_ERR
    if (bgsaveerr != REDIS_OK) {
        freeClient(slave);
        redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned "
        "an error");
        continue;
    }
    // 打開RDB 文件
    if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
        redis_fstat(slave->repldbfd,&buf) == -1) {
        freeClient(slave);
        redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after"
        " BGSAVE: %s", strerror(errno));
        continue;
    }
    slave->repldboff = 0;
    slave->repldbsize = buf.st_size;
    slave->replstate = REDIS_REPL_SEND_BULK;
    // 如果之前有注冊(cè)寫事件,取消
    aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
    // 注冊(cè)新的寫事件,sendBulkToSlave() 傳輸RDB 文件
    if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
        sendBulkToSlave, slave) == AE_ERR) {
        freeClient(slave);
        continue;
        }
    }
}
    // startbgsave == REDIS_ERR 表示BGSAVE 失敗,再一次進(jìn)行BGSAVE 嘗試
    if (startbgsave) {
        /* Since we are starting a new background save for one or more slaves,
        * we flush the Replication Script Cache to use EVAL to propagate every
        * new EVALSHA for the first time, since all the new slaves don't know
        * about previous scripts. */
        replicationScriptCacheFlush();
    if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
        /*BGSAVE 可能fork 失敗,所有等待BGSAVE 的從機(jī)都將結(jié)束連接。這是
        redis 自我保護(hù)的措施,fork 失敗很可能是內(nèi)存緊張*/
        listIter li;
        listRewind(server.slaves,&li);
        redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
    while((ln = listNext(&li))) {
        redisClient *slave = ln->value;
    if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
        freeClient(slave);
        }
     }
  }
}

部分同步

如上所說,無論如何,Redis 首先會(huì)嘗試部分同步。部分同步即把積壓空間緩存的數(shù)據(jù),即更新記錄發(fā)送給從機(jī)。

從機(jī)連接主機(jī)后,會(huì)主動(dòng)發(fā)起 PSYNC 命令,從機(jī)會(huì)提供 master_runid 和offset,主機(jī)驗(yàn)證 master_runid 和 offset 是否有效?驗(yàn)證通過則,進(jìn)行部分同步:主機(jī)返回 +CONTINUE(從機(jī)接收后會(huì)注冊(cè)積壓數(shù)據(jù)接收事件),接著發(fā)送積壓空間數(shù)據(jù)。

主機(jī)和從機(jī)之間的交互圖如下:

http://wiki.jikexueyuan.com/project/redis/images/r3.png" alt="" />

syncWithMaster() 已經(jīng)被設(shè)置為回調(diào)函數(shù),當(dāng)與主機(jī)建立連接后,syncWithMaster() 會(huì)被回調(diào),這一點(diǎn)查閱在 connectWithMaster() 函數(shù)。首先如果該從機(jī)從未與主機(jī)有過連接,那么會(huì)進(jìn)行全同步,從主機(jī)拷貝所有的數(shù)據(jù);否則,會(huì)嘗試進(jìn)行部分同步。

// 連接主機(jī)connectWithMaster() 的時(shí)候,會(huì)被注冊(cè)為回調(diào)函數(shù)
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    char tmpfile[256], *err;
    int dfd, maxtries = 5;
    int sockerr = 0, psync_result;
    socklen_t errlen = sizeof(sockerr);
    ......
    // 嘗試部分同步,主機(jī)允許進(jìn)行部分同步會(huì)返回+CONTINUE,從機(jī)接收后注冊(cè)相應(yīng)的事件
    /* Try a partial resynchonization. If we don't have a cached master
    * slaveTryPartialResynchronization() will at least try to use PSYNC
    * to start a full resynchronization so that we get the master run id
    * and the global offset, to try a partial resync at the next
    * reconnection attempt. */
    // 函數(shù)返回三種狀態(tài):
    // PSYNC_CONTINUE:表示會(huì)進(jìn)行部分同步,在slaveTryPartialResynchronization()
    // 中已經(jīng)設(shè)置回調(diào)函數(shù)readQueryFromClient()
    // PSYNC_FULLRESYNC:全同步,會(huì)下載RDB 文件
    // PSYNC_NOT_SUPPORTED:未知
    psync_result = slaveTryPartialResynchronization(fd);
    if (psync_result == PSYNC_CONTINUE) {
        redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a "
        "Partial Resynchronization.");
        return;
    }
    // 執(zhí)行全同步
    ......
}

slaveTryPartialResynchronization() 主要工作是判斷是進(jìn)行全同步還是部分同步。

// 函數(shù)返回三種狀態(tài):
// PSYNC_CONTINUE:表示會(huì)進(jìn)行部分同步,已經(jīng)設(shè)置回調(diào)函數(shù)
// PSYNC_FULLRESYNC:全同步,會(huì)下載RDB 文件
// PSYNC_NOT_SUPPORTED:未知
#define PSYNC_CONTINUE 0
#define PSYNC_FULLRESYNC 1
#define PSYNC_NOT_SUPPORTED 2
int slaveTryPartialResynchronization(int fd) {
    char *psync_runid;
    char psync_offset[32];
    sds reply;
    /* Initially set repl_master_initial_offset to -1 to mark the current
    * master run_id and offset as not valid. Later if we'll be able to do
    * a FULL resync using the PSYNC command we'll set the offset at the
    * right value, so that this information will be propagated to the
    * client structure representing the master into server.master. */
    server.repl_master_initial_offset = -1;
    if (server.cached_master) {
        // 緩存了上一次與主機(jī)連接的信息,可以嘗試進(jìn)行部分同步,減少數(shù)據(jù)傳輸
        psync_runid = server.cached_master->replrunid;
        snprintf(psync_offset,sizeof(psync_offset),"%lld",
        server.cached_master->reploff+1);
        redisLog(REDIS_NOTICE,"Trying a partial resynchronization "
        "(request %s:%s).", psync_runid, psync_offset);
    } else {
        // 未緩存上一次與主機(jī)連接的信息,進(jìn)行全同步
        // psync ? -1 可以獲取主機(jī)的master_runid
        redisLog(REDIS_NOTICE,"Partial resynchronization not possible "
        "(no cached master)");
        psync_runid = "?";
        memcpy(psync_offset,"-1",3);
    }
    // 向主機(jī)發(fā)送命令,并接收回復(fù)
    /* Issue the PSYNC command */
    reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);
    // 全同步
    if (!strncmp(reply,"+FULLRESYNC",11)) {
        char *runid = NULL, *offset = NULL;
        /* FULL RESYNC, parse the reply in order to extract the run id
        * and the replication offset. */
        runid = strchr(reply,' ');
    if (runid) {
        runid++;
        offset = strchr(runid,' ');
    if (offset) offset++;
    }
    if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
        redisLog(REDIS_WARNING,
        "Master replied with wrong +FULLRESYNC syntax.");
        /* This is an unexpected condition, actually the +FULLRESYNC
        * reply means that the master supports PSYNC, but the reply
        * format seems wrong. To stay safe we blank the master
        * runid to make sure next PSYNCs will fail. */
        memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
    } else {
        // 拷貝runid
        memcpy(server.repl_master_runid, runid, offset-runid-1);
        server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
        server.repl_master_initial_offset = strtoll(offset,NULL,10);
        redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld",
        server.repl_master_runid,
        server.repl_master_initial_offset);
    }
    /* We are going to full resync, discard the cached master structure. */
    replicationDiscardCachedMaster();
    sdsfree(reply);
    return PSYNC_FULLRESYNC;
    }
    // 部分同步
    if (!strncmp(reply,"+CONTINUE",9)) {
        /* Partial resync was accepted, set the replication state accordingly */
        redisLog(REDIS_NOTICE,
        "Successful partial resynchronization with master.");
        sdsfree(reply);
        // 緩存主機(jī)替代現(xiàn)有主機(jī),且為PSYNC(部分同步) 做好準(zhǔn)備
        replicationResurrectCachedMaster(fd);
        return PSYNC_CONTINUE;
    }
    /* If we reach this point we receied either an error since the master does
    * not understand PSYNC, or an unexpected reply from the master.
    * Reply with PSYNC_NOT_SUPPORTED in both cases. */
    // 接收到主機(jī)發(fā)出的錯(cuò)誤信息
    if (strncmp(reply,"-ERR",4)) {
        /* If it's not an error, log the unexpected event. */
        redisLog(REDIS_WARNING,
        "Unexpected reply to PSYNC from master: %s", reply);
    } else {
        redisLog(REDIS_NOTICE,
        "Master does not support PSYNC or is in "
        "error state (reply: %s)", reply);
    }
    sdsfree(reply);
    replicationDiscardCachedMaster();
    return PSYNC_NOT_SUPPORTED;
}

下面 syncCommand() 摘取部分同步的部分:

// 主機(jī)SYNC 和PSYNC 命令處理函數(shù),會(huì)嘗試進(jìn)行部分同步和全同步
/* SYNC ad PSYNC command implemenation. */
void syncCommand(redisClient *c) {
    ......
    // 主機(jī)嘗試部分同步,允許則進(jìn)行部分同步,會(huì)返回+CONTINUE,接著發(fā)送積壓空間
    /* Try a partial resynchronization if this is a PSYNC command.
    * If it fails, we continue with usual full resynchronization, however
    * when this happens masterTryPartialResynchronization() already
    * replied with:
    **
    +FULLRESYNC <runid> <offset>
    **
    So the slave knows the new runid and offset to try a PSYNC later
    * if the connection with the master is lost. */
    if (!strcasecmp(c->argv[0]->ptr,"psync")) {
        // 部分同步
    if (masterTryPartialResynchronization(c) == REDIS_OK) {
        server.stat_sync_partial_ok++;
        return; /* No full resync needed, return. */
    } else {
        // 部分同步失敗,會(huì)進(jìn)行全同步,這時(shí)會(huì)收到來自客戶端的runid
        char *master_runid = c->argv[1]->ptr;
        /* Increment stats for failed PSYNCs, but only if the
        * runid is not "?", as this is used by slaves to force a full
        * resync on purpose when they are not albe to partially
        * resync. */
    if (master_runid[0] != '?') server.stat_sync_partial_err++;
    }
    } else {
        /* If a slave uses SYNC, we are dealing with an old implementation
        * of the replication protocol (like redis-cli --slave). Flag the client
        * so that we don't expect to receive REPLCONF ACK feedbacks. */
        c->flags |= REDIS_PRE_PSYNC_SLAVE;
    }
    // 執(zhí)行全同步:
    ......
}

主機(jī)雖然收到了來自從機(jī)的部分同步的請(qǐng)求,但主機(jī)并不一定會(huì)允許進(jìn)行部分同步。在主機(jī)側(cè),如果收到部分同步的請(qǐng)求,還需要驗(yàn)證從機(jī)是否適合進(jìn)行部分同步。

// 主機(jī)嘗試是否能進(jìn)行部分同步
/* This function handles the PSYNC command from the point of view of a
* master receiving a request for partial resynchronization.
**
On success return REDIS_OK, otherwise REDIS_ERR is returned and we proceed
* with the usual full resync. */
int masterTryPartialResynchronization(redisClient *c) {
    long long psync_offset, psync_len;
    char *master_runid = c->argv[1]->ptr;
    char buf[128];
    int buflen;
    /* Is the runid of this master the same advertised by the wannabe slave
    * via PSYNC? If runid changed this master is a different instance and
    * there is no way to continue. */
    if (strcasecmp(master_runid, server.runid)) {
        // 當(dāng)因?yàn)楫惓P枰c主機(jī)斷開連接的時(shí)候,從機(jī)會(huì)暫存主機(jī)的狀態(tài)信息,以便
        // 下一次的部分同步。
        // 1)master_runid 是從機(jī)提供一個(gè)因緩存主機(jī)的runid,
        // 2)server.runid 是本機(jī)(主機(jī))的runid。
        // 匹配失敗,說明是本機(jī)(主機(jī))不是從機(jī)緩存的主機(jī),這時(shí)候不能進(jìn)行部分同步,
        // 只能進(jìn)行全同步
        // "?" 表示從機(jī)要求全同步
        // 什么時(shí)候從機(jī)會(huì)要求全同步???
    /* Run id "?" is used by slaves that want to force a full resync. */
    if (master_runid[0] != '?') {
        redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
        "Runid mismatch (Client asked for '%s', I'm '%s')",
        master_runid, server.runid);
    } else {
        redisLog(REDIS_NOTICE,"Full resync requested by slave.");
    }
        goto need_full_resync;
    }
    // 從參數(shù)中解析整數(shù),整數(shù)是從機(jī)指定的偏移量
    /* We still have the data our slave is asking for? */
    if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
        REDIS_OK) goto need_full_resync;
        // 部分同步失敗的情況:
        // 1、不存在積壓空間
    if (!server.repl_backlog ||
        // 2、psync_offset 太過小,即從機(jī)錯(cuò)過太多更新記錄,安全起見,實(shí)行全同步
        // 我們知道,積壓空間的大小是有限的,如果某個(gè)從機(jī)錯(cuò)過的更新過多,將無法
        // 在積壓空間中找到更新的記錄
        psync_offset 越界
        psync_offset < server.repl_backlog_off ||
        psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
        // 經(jīng)檢測,不滿足部分同步的條件,轉(zhuǎn)而進(jìn)行全同步
    {
    redisLog(REDIS_NOTICE,
    "Unable to partial resync with the slave for lack of backlog "
    "(Slave request was: %lld).", psync_offset);
    if (psync_offset > server. ) {
        redisLog(REDIS_WARNING,
        "Warning: slave tried to PSYNC with an offset that is "
        "greater than the master replication offset.");
    }
    goto need_full_resync;
    }
    // 執(zhí)行部分同步:
    // 1)標(biāo)記客戶端為從機(jī)
    // 2)通知從機(jī)準(zhǔn)備接收數(shù)據(jù)。從機(jī)收到+CONTINUE 會(huì)做好準(zhǔn)備
    // 3)開發(fā)發(fā)送數(shù)據(jù)
    /* If we reached this point, we are able to perform a partial resync:
    * 1) Set client state to make it a slave.
    * 2) Inform the client we can continue with +CONTINUE
    * 3) Send the backlog data (from the offset to the end) to the slave. */
    // 將連接的客戶端標(biāo)記為從機(jī)
    c->flags |= REDIS_SLAVE;
    // 表示進(jìn)行部分同步
    // #define REDIS_REPL_ONLINE 9 /* RDB file transmitted, sending just
    // updates. */
    c->replstate = REDIS_REPL_ONLINE;
    // 更新ack 的時(shí)間
    c->repl_ack_time = server.unixtime;
    // 添加入從機(jī)鏈表
    listAddNodeTail(server.slaves,c);
    // 告訴從機(jī)可以進(jìn)行部分同步,從機(jī)收到后會(huì)做相關(guān)的準(zhǔn)備(注冊(cè)回調(diào)函數(shù))
    /* We can't use the connection buffers since they are used to accumulate
    * new commands at this stage. But we are sure the socket send buffer is
    * emtpy so this write will never fail actually. */
    buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
    if (write(c->fd,buf,buflen) != buflen) {
        freeClientAsync(c);
        return REDIS_OK;
    }
    // 向從機(jī)寫積壓空間中的數(shù)據(jù),積壓空間存儲(chǔ)有「更新緩存」
    psync_len = addReplyReplicationBacklog(c,psync_offset);
    redisLog(REDIS_NOTICE,
    "Partial resynchronization request accepted. Sending %lld bytes of "
    "backlog starting from offset %lld.", psync_len, psync_offset);
    /* Note that we don't need to set the selected DB at server.slaveseldb
    * to -1 to force the master to emit SELECT, since the slave already
    * has this state from the previous connection with the master. */
    refreshGoodSlavesCount();
    return REDIS_OK; /* The caller can return, no full resync needed. */
    need_full_resync:
    ......
    // 向從機(jī)發(fā)送+FULLRESYNC runid repl_offset
}

緩存主機(jī)

從機(jī)因?yàn)槟承┰?,譬如網(wǎng)絡(luò)延遲(PING 超時(shí),ACK 超時(shí)等),可能會(huì)斷開與主機(jī)的連接。這時(shí)候,從機(jī)會(huì)嘗試保存與主機(jī)連接的信息,譬如全局積壓空間數(shù)據(jù)偏移量等,以便下一次的部分同步,并且從機(jī)會(huì)再一次嘗試連接主機(jī)。注意一點(diǎn),如果斷開的時(shí)間足夠長,部分同步肯定會(huì)失敗的。

void freeClient(redisClient *c) {
    listNode *ln;
    /* If this is mar