接上文:Mongodb源码分析之Replication模式 、Mongodb源码分析之Replication模式第二部分
当使用复制集(Replica sets)模式时,其会使用下面的local数据库:
local.system.replset 用于复制集配置对象存储 (通过shell下的rs.conf()或直接查询) local.oplog.rs 一个capped collection集合.可在命令行下使用–oplogSize 选项设置该集合大小尺寸. local.replset.minvalid 通常在复制集内使用,用于跟踪同步状态(sync status) |
* Master
o local.oplog.$main 存储”oplog”信息
o local.slaves 存储在master结点上相应slave结点的同步情况(比如syncTo时间戳等)
* Slave
o local.sources 从结点所要链接的master结点信息(可通过–source配置参数指定)
* Other
o local.me 未知待查:)
o local.pair.* (replica pairs选项,目前已不推荐使用)
{ ts : …, op: …, ns: …, o: … o2: … }
ts:8字节的时间戳,由4字节unix timestamp + 4字节自增计数表示。
o2: 在执行更新操作时的where条件,仅限于update时才有该属性
”i”: insert
”u”: update
”d”: delete
”c”: db cmd
”db”:声明当前数据库 (其中ns 被设置成为=>数据库名称+ ‘.’)
”n”: no op,即空操作,其会定期执行以确保时效性
D:mongodbbin>mongod –replSet myoplogs –dbpath d:mongodbdb |
D:mongodbbin>mongo MongoDB shell version: … connecting to: test > rs.initiate() |
> use local switched to db local > db.oplog.rs.find() {“ts” : { “t” : 1306207268000, “i” : 1 }, “h” : NumberLong(0), “op” : “n”, “ns” : “”, “o” : { “msg” : “initiating set” } } |
> use test switched to db test > db.foo.insert({x:1}) > db.foo.update({x:1}, {$set : {y:1}}) > db.foo.update({x:2}, {$set : {y:1}}, true) > db.foo.remove({x:1}) |
> use local switched to db local > db.oplog.rs.find() { “ts” : { “t” : 1306207268000, “i” : 1 }, “h” : NumberLong(0), “op” : “n”, “ns” : “”, “o” : { “msg” : “initiating set” } } { “ts” : { “t” : 1306207310000, “i” : 1 }, “h” : NumberLong(“3138280161636515857”), “op” : “i”, “ns” : “test.foo”, “o” : { “_id” : ObjectId(“4ddb244d2d0d00000000551a”), “x” : 1 } } { “ts” : { “t” : 1306207314000, “i” : 1 }, “h” : NumberLong(“772196482295043060”), “op” : “u”, “ns” : “test.foo”, “o2” : { “_id” : ObjectId(“4ddb244d2d0d00000000551a”) }, “o” : { “$set” : { “y” : 1 } } } { “ts” : { “t” : 1306207317000, “i” : 1 }, “h” : NumberLong(“7272647888810218413”), “op” : “i”, “ns” : “test.foo”, “o” : { “_id” : ObjectId(“4ddb2455be10000000002f6a”), “x” : 2, “y” : 1 } } { “ts” : { “t” : 1306207325000, “i” : 1 }, “h” : NumberLong(“3083832920223263240”), “op” : “d”, “ns” : “test.foo”, “b” : true, “o” : { “_id” : ObjectId(“4ddb244d2d0d00000000551a”) } } |
注:其它常用命令:db.oplog.$main.help() db.printReplicationInfo();
createOplog():初始化本地local.oplog.$main 集合信息,包括大小尺寸等
_logOpOld(): 用于在master/slave模式下向local.oplog.$main 添加oplog信息
_logOpObjRS(): 用于在replset模式下向local.oplog.rs模式下添加oplog 信息
//repl.cpp void startReplication() { /* if we are going to be a replica set, we aren’t doing other forms of replication. */ if( !cmdLine._replSet.empty() ) { …… //绑定函数,oplog.cpp ->_logOpRS(), 用于处理replset类型的oplog操作 newRepl(); return; } //绑定函数,oplog.cpp ->_logOpOld(), 用于处理master-slave类型的oplog操作 oldRepl(); |
//如果是master,则构造并启动线程方法replMasterThread if ( replSettings.master || replPair ) { if ( replSettings.master ) log(1) << “master=true” << endl; replSettings.master = true; //构造oplog集合”local.oplog.$main” createOplog(); boost::thread t(replMasterThread); } |
//repl.cpp void ReplSource::sync_pullOpLog_applyOperation(BSONObj& op, OpTime *localLogTail, bool alreadyLocked) { ….. if( cmdLine.pretouch && !alreadyLocked /*在非写锁下,因为写锁下进行pretouch操作没意义*/) { if( cmdLine.pretouch > 1 ) { /* note: this is bad – should be put in ReplSource. but this is first test… */ static int countdown; assert( countdown >= 0 ); if( countdown > 0 ) { countdown–; // was pretouched on a prev pass } else { const int m = 4; if( tp.get() == 0 ) { int nthr = min(8, cmdLine.pretouch); nthr = max(nthr, 1); tp.reset( new ThreadPool(nthr) ); } vector v; oplogReader.peek(v, cmdLine.pretouch); unsigned a = 0; while( 1 ) { if( a >= v.size() ) break; unsigned b = a + m – 1; // v[a..b] if( b >= v.size() ) b = v.size() – 1; tp->schedule(pretouchN, v, a, b); DEV cout << “pretouch task: ” << a << “..” << b << endl; a += m; } //执行oplog.cpp中的预取操作 pretouchOperation(op); tp->join(); countdown = v.size(); } } else { //执行oplog.cpp中的预取操作 pretouchOperation(op); } } …… //使用获取到的oplog操作信息进行同步操作,有关该方法本人已在这个链接中有关介绍 //Mongodb源码分析–Replication之主从模式–Slave applyOperation( op ); …… } |
//oplog.cpp中的相应方法 //某些情况下MongoDB会锁住数据库。如果此时正有数百个请求,则它们会堆积起来,造成许多问题。 //这里使用下面的优化方式来避免锁定: // 每次更新前,先查询记录。查询操作会将对象放入内存,于是更新则会尽可能的迅速。 //在主/从部署方案中,从节点可以使用“-pretouch”参数运行,这也可以得到相同的效果。 void pretouchOperation(const BSONObj& op) { // no point pretouching if write locked. //not sure if this will ever fire, but just in case. if( dbMutex.isWriteLocked() )//写锁下则不执行pretouch操作 return; const char *which = “o”;//数据的对象 const char *opType = op.getStringField(“op”); if ( *opType == ‘i’ ) ; else if( *opType == ‘u’ )//如是更新操作时,则执行touch which = “o2”;//更新的条件pattern else return; /* todo : other operations */ try { BSONObj o = op.getObjectField(which); BSONElement _id; //获取_id信息,用于下面find查找 if( o.getObjectID(_id) ) { const char *ns = op.getStringField(“ns”); BSONObjBuilder b; b.append(_id); BSONObj result; readlock lk(ns); Client::Context ctx( ns ); //获取要更新的obj if( Helpers::findById(cc(), ns, b.done(), result) ) _dummy_z += result.objsize(); // touch } } catch( DBException& ) { log() << “ignoring assertion in pretouchOperation()” << endl; } } |
/*@ @param opstr: c userCreateNS i insert n no-op / keepalive d delete / remove u update */ void logOp(const char *opstr, const char *ns, const BSONObj& obj, BSONObj *patt, bool *b) { if ( replSettings.master ) { _logOp(opstr, ns, 0, obj, patt, b);//这里将会执行之前函数绑定的方法,比如:_logOpOld } //用于在shard模式下oplog操作 logOpForSharding( opstr , ns , obj , patt ); } |
//update.cpp static UpdateResult _updateById(bool isOperatorUpdate, int idIdxNo, ModSet *mods, int profile, NamespaceDetails *d, NamespaceDetailsTransient *nsdt, bool god, const char *ns, const BSONObj& updateobj, BSONObj patternOrig, bool logop, OpDebug& debug) { …… if ( logop ) { …… if( mss->needOpLogRewrite() ) { DEBUGUPDATE( “t rewrite update: ” << mss->getOpLogRewrite() ); logOp(“u”, ns, mss->getOpLogRewrite() , &pattern );//记录日志 } else { logOp(“u”, ns, updateobj, &pattern );//记录日志 } } ….. } |
/* we write to local.oplog.$main: { ts : …, op: …, ns: …, o: … o2: … } ts: an OpTime timestamp op: ”i” insert ”u” update ”d” delete ”c” db cmd ”db” declares presence of a database (ns is set to the db name + ‘.’) ”n” no op logNS – where to log it. 0/null means “local.oplog.$main”. bb: if not null, specifies a boolean to pass along to the other side as b: param. used for “justOne” or “upsert” flags on ‘d’, ‘u’ note this is used for single collection logging even when –replSet is enabled. */ static void _logOpOld(const char *opstr, const char *ns, const char *logNS, const BSONObj& obj, BSONObj *o2, bool *bb ) { DEV assertInWriteLock(); static BufBuilder bufbuilder(8*1024); //如是master上的local.slaves,则更新该数据库缓存信息 if ( strncmp(ns, “local.”, 6) == 0 ) { if ( strncmp(ns, “local.slaves”, 12) == 0 ) { resetSlaveCache(); } return; } //获取当前系统时间作为操作时间 const OpTime ts = OpTime::now(); Client::Context context; /* we jump through a bunch of hoops here to avoid copying the obj buffer twice — instead we do a single copy to the destination position in the memory mapped file. */ //生成bson格式的oplog的对象 bufbuilder.reset(); BSONObjBuilder b(bufbuilder); b.appendTimestamp(“ts”, ts.asDate()); b.append(“op”, opstr); b.append(“ns”, ns); if ( bb ) b.appendBool(“b”, *bb); if ( o2 )//如果不为空,表示执行操作的条件(where) b.append(“o2”, *o2); BSONObj partial = b.done(); // partial is everything except the o:… part. int po_sz = partial.objsize(); int len = po_sz + obj.objsize() + 1 + 2 /*o:*/; Record *r; //为0/null,表示”local.oplog.$main” if( logNS == 0 ) { logNS = “local.oplog.$main”; if ( localOplogMainDetails == 0 ) { Client::Context ctx( logNS , dbpath, 0, false); localDB = ctx.db(); assert( localDB ); localOplogMainDetails = nsdetails(logNS); assert( localOplogMainDetails ); } Client::Context ctx( logNS , localDB, false ); //在指定Namespace下的Extent中分配一个record空间,用于下面添加log信息 r = theDataFileMgr.fast_oplog_insert(localOplogMainDetails, logNS, len); } else { //在指定的logNS中分配一个record空间 Client::Context ctx( logNS, dbpath, 0, false ); assert( nsdetails( logNS ) ); // 如第一次分配first we allocate the space, then we fill it below. r = theDataFileMgr.fast_oplog_insert( nsdetails( logNS ), logNS, len); } //将操作条件(where)追加到已存在对象 append_O_Obj(r->data, partial, obj); //设置最近操作时间 context.getClient()->setLastOp( ts.asDate() ); if ( logLevel >= 6 ) { BSONObj temp(r); log( 6 ) << “logging op:” << temp << endl; } } /** given a BSON object, create a new one at dst which is the existing (partial) object with a new object element appended at the end with fieldname “o”. @param partial already build object with everything except the o member. e.g. something like: { ts:…, ns:…, o2:… } @param o a bson object to be added with fieldname “o” @dst where to put the newly built combined object. e.g. ends up as something like: { ts:…, ns:…, o2:…, o:… } */ void append_O_Obj(char *dst, const BSONObj& partial, const BSONObj& o) { const int size1 = partial.objsize() – 1; // less the EOO char const int oOfs = size1+3; // 3 = byte BSONOBJTYPE + byte ‘o’ + byte |