Redis 的監(jiān)視機(jī)制允許某一個(gè)客戶端監(jiān)視 Redis 服務(wù)器的行為,這種服務(wù)對(duì)于測(cè)試來(lái)說(shuō)比較有幫助。
監(jiān)視機(jī)制通過(guò) monitor 這個(gè)命令來(lái)實(shí)現(xiàn)。來(lái)看看它的實(shí)現(xiàn):Redis 在這里只是簡(jiǎn)單講這個(gè)客戶端加到一個(gè) redis.monitors 鏈表中,接著就回復(fù) ok 給客戶端。
void monitorCommand(redisClient *c) {
/* ignore MONITOR if already slave or in monitor mode */
if (c->flags & REDIS_SLAVE) return;
c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
listAddNodeTail(server.monitors,c);
addReply(c,shared.ok);
}
這里可以想象,當(dāng)這個(gè) Redis 服務(wù)器處理其他命令的時(shí)候,會(huì)向這個(gè)鏈表中的所有客戶端發(fā)送通知。我們找到執(zhí)行命令的核心函數(shù) call(),可以發(fā)現(xiàn)確實(shí)是這么做的:
// call() 函數(shù)是執(zhí)行命令的核心函數(shù),真正執(zhí)行命令的地方
/* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) {
long long dirty, start = ustime(), duration;
int client_old_flags = c->flags;
/* Sent the command to clients in MONITOR mode, only if the commands are
* not generated from reading an AOF. */
if (listLength(server.monitors) &&
!server.loading &&
!(c->cmd->flags & REDIS_CMD_SKIP_MONITOR))
{
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
......
}
replicationFeedMonitors() 的實(shí)現(xiàn)實(shí)際上就是將命令打包好,發(fā)送給每個(gè)監(jiān)視器:
// 向監(jiān)視器發(fā)送數(shù)據(jù)
void replicationFeedMonitors(redisClient *c, list *monitors, int dictid,
robj **argv, int argc) {
listNode *ln;
listIter li;
int j;
sds cmdrepr = sdsnew("+");
robj *cmdobj;
char peerid[REDIS_PEER_ID_LEN];
struct timeval tv;
// 時(shí)間
gettimeofday(&tv,NULL);
cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
// 各種不同的客戶端
if (c->flags & REDIS_LUA_CLIENT) {
cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
} else if (c->flags & REDIS_UNIX_SOCKET) {
cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
} else {
getClientPeerId(c,peerid,sizeof(peerid));
cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,peerid);
}
for (j = 0; j < argc; j++) {
if (argv[j]->encoding == REDIS_ENCODING_INT) {
cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr);
} else {
cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
sdslen(argv[j]->ptr));
}
if (j != argc-1)
cmdrepr = sdscatlen(cmdrepr," ",1);
}
cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
cmdobj = createObject(REDIS_STRING,cmdrepr);
// 發(fā)送
listRewind(monitors,&li);
while((ln = listNext(&li))) {
redisClient *monitor = ln->value;
addReply(monitor,cmdobj);
}
decrRefCount(cmdobj);
}