分析在MongoDB中正成为越来越重要的话题,因为它在越来越多的大型项目中使用。人们厌倦了使用不同的软件来做分析(包括Hadoop),它们显然需要传输大量开销的数据。
MongoDB提供了两种内置分析数据的方法:Map Reduce和Aggregation框架。MR非常灵活,很容易部署。它通过分区工作良好,并允许大量输出。MR在MongoDB v2.4中,通过使用JavaScript引擎把Spider Monkey替换成V8,性能提升很多。老板抱怨它太慢了,尤其是和Agg框架(使用C++)相比。让我们看看能否从中榨出点果汁。
练习
让我们插入1千万条文档,每个文档包含一个从0到1000000的整数。这意味着平均有10个文档会具有相同的值。
01> for (vari = 0; i < 10000000; ++i){ db.uniques.insert({ dim0: Math.floor(Math.random()*1000000) });}
02> db.uniques.findOne()
03{ “_id” : ObjectId(”51d3c386acd412e22c188dec”), “dim0” : 570859 }
04> db.uniques.ensureIndex({dim0: 1})
05> db.uniques.stats()
06{
07 “ns” : “test.uniques”,
08 “count” : 10000000,
09 “size” : 360000052,
10 “avgObjSize” : 36.0000052,
11 “storageSize” : 582864896,
12 “numExtents” : 18,
13 “nindexes” : 2,
14 “lastExtentSize” : 153874432,
15 “paddingFactor” : 1,
16 “systemFlags” : 1,
17 “userFlags” : 0,
18 “totalIndexSize” : 576040080,
19 “indexSizes” : {
20 “_id_” : 324456384,
21 “dim0_1” : 251583696
22 },
23 “ok” : 1
24}
从这其中,我们想要计算出现的不同值的个数。可以用下列MR任务轻松完成这个工作:
01> db.runCommand(
02{ mapreduce: “uniques”,
03map: function() { emit(this.dim0, 1); },
04reduce: function (key, values) { returnArray.sum(values); },
05out: “mrout” })
06{
07 “result” : “mrout”,
08 “timeMillis” : 1161960,
09 “counts” : {
10 “input” : 10000000,
11 “emit” : 10000000,
12 “reduce” : 1059138,
13 “output” : 999961
14 },
15 “ok” : 1
16}
正如你在输出内容中看到的,这耗费了大概1200秒(在EC2 M3实例上进行的测试)。有1千万个map,1百万个reduce,输出了999961个文档。结果就像下面这样:
01> db.mrout.find()
02{ “_id” : 1, “value” : 10 }
03{ “_id” : 2, “value” : 5 }
04{ “_id” : 3, “value” : 6 }
05{ “_id” : 4, “value” : 10 }
06{ “_id” : 5, “value” : 9 }
07{ “_id” : 6, “value” : 12 }
08{ “_id” : 7, “value” : 5 }
09{ “_id” : 8, “value” : 16 }
10{ “_id” : 9, “value” : 10 }
11{ “_id” : 10, “value” : 13 }
12…
使用排序
我在上一篇博文中提到了在MR中使用排序多么有益。这个特性很少被理解。在这个例子中,处理未排序的输入意味着MR引擎将得到随机顺序的值,在RAM中根本无法reduce。相反,它将不得不把所有文章写入一个临时收集的磁盘,然后按顺序读取并reduce。让我们看看使用排序是否有助:
01> db.runCommand(
02{ mapreduce: “uniques”,
03map: function() { emit(this.dim0, 1); },
04reduce: function (key, values) { returnArray.sum(values); },
05out: “mrout”,
06sort: {dim0: 1} })
07{
08 “result” : “mrout”,
09 “timeMillis” : 192589,
10 “counts” : {
11 “input” : 10000000,
12 “emit” : 10000000,
13 “reduce” : 1000372,
14 “output” : 999961
15 },
16 “ok” : 1
17}
确实大有助益!我们下降到192秒,已经提升了6倍。reduce的数量基本相同,但现在它们在写入磁盘前,可以在RAM内完成。
使用多线程
MongoDB对单独的MR作业并不使用多线程——它仅仅对多作业使用多线程。但通过多核CPU,在单个服务器使用Hadoop风格来并行作业非常有优势。我们需要做的是把输入分成几块,通过各个块来加速一个MR作业。也许数据集有简单的方法来分割,但其他使用splitVector命令(不明确)可以使你很快的找到分割点:
01> db.runCommand({splitVector: “test.uniques”, keyPattern: {dim0: 1}, maxChunkSizeBytes: 32000000})
02{
03 “timeMillis” : 6006,
04 “splitKeys” : [
05 {
06 “dim0” : 18171
07 },
08 {
09 “dim0” : 36378
10 },
11 {
12 “dim0” : 54528
13 },
14 {
15 “dim0” : 72717
16 },
17…
18 {
19 “dim0” : 963598
20 },
21 {
22 “dim0” : 981805
23 }
24 ],
25 “ok” : 1
26}
这个命令在超过1千万个文档中找到分割点仅仅需要花费5秒,很快!那么现在我们仅仅需要一个方法来创建多个MR作业。从一个应用服务器,使用多线程和为MR命令使用$gt/$It查询 相当简单。通过shell,你可以使用ScopedThread,使用方法如下:
1> var t = new ScopedThread(mapred, 963598, 981805)
2> t.start()
3> t.join()
现在我们把一些快速运行的js代码放在一起,它们会产生4个线程(或者更多的线程),执行后呈现出下面的结果:
01> varres = db.runCommand({splitVector: “test.uniques”, keyPattern: {dim0: 1}, maxChunkSizeBytes: 32 *1024 * 1024 })
02> var keys = res.splitKeys
03> keys.length
0439
05> varmapred = function(min, max) {
06returndb.runCommand({ mapreduce: “uniques”,
07map: function() { emit(this.dim0, 1); },
08reduce: function (key, values) { returnArray.sum(values); },
09out: “mrout”+ min,
10sort: {dim0: 1},
11query: { dim0: { $gte: min, $lt: max } } }) }
12> var numThreads = 4
13> var inc = Math.floor(keys.length / numThreads) + 1
14> threads = []; for (var i = 0; i < numThreads; ++i) { varmin = (i == 0) ? 0 : keys[i * inc].dim0; varmax = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print(”min:”+ min + ” max:” + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
15min:0 max:274736
16min:274736 max:524997
17min:524997 max:775025
18min:775025 max:{ “$maxKey” : 1 }
19connecting to: test
20connecting to: test
21connecting to: test
22connecting to: test
23> for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }
24{
25 “result” : “mrout0”,
26 “timeMillis” : 205790,
27 “counts” : {
28 “input” : 2750002,
29 “emit” : 2750002,
30 “reduce” : 274828,
31 “output” : 274723
32 },
33 “ok” : 1
34}
35{
36 “result” : “mrout274736”,
37 “timeMillis” : 189868,
38 “counts” : {
39 “input” : 2500013,
40 “emit” : 2500013,
41 “reduce” : 250364,
42 “output” : 250255
43 },
44 “ok” : 1
45}
46{
47 “result” : “mrout524997”,
48 “timeMillis” : 191449,
49 “counts” : {
50 “input” : 2500014,
51 “emit” : 2500014,
52 “reduce” : 250120,
53 “output” : 250019
54 },
55″ok” : 1
56}
57{
58 “result” : “mrout775025”,
59 “timeMillis” : 184945,
60 “counts” : {
61 “input” : 2249971,
62 “emit” : 2249971,
63 “reduce” : 225057,
64 “output” : 224964
65 },
66 “ok” : 1
67}
68 “ok” : 1
69}
70{
71 “result” : “mrout775025”,
72 “timeMillis” : 184945,
73 “counts” : {
74 “input” : 2249971,
75 “emit” : 2249971,
76 “reduce” : 225057,
77 “output” : 224964
78 },
79 “ok” : 1
80}
第一个线程时间确实超过了其他的线程,但是平均每个线程仍然用了大约190s的时间.这意味着并没有一个线程快!这有点奇怪,自从用了‘top’,在某种程度上,你可以看到所有的内核运行情况。
使用多数据库
问题是在多线程之间会有很多锁竞争。在上锁时,MR并不是那么无私的(它每1000次读操作就会产生一次锁定),而且MR任务还会执行许多写操作,导致线程最终都会在等待另一个线程。由于每个MongoDB数据库都有私有锁,让我们尝试为每一个线程使用一个不同的输出数据库:
01> varmapred = function(min, max) {
02returndb.runCommand({ mapreduce: “uniques”,
03map: function() { emit(this.dim0, 1); },
04reduce: function (key, values) { returnArray.sum(values); },
05out: { replace: “mrout” + min, db: “mrdb”+ min },
06sort: {dim0: 1},
07query: { dim0: { $gte: min, $lt: max } } }) }
08> threads = []; for (var i = 0; i < numThreads; ++i) { varmin = (i == 0) ? 0 : keys[i * inc].dim0; varmax = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print(”min:”+ min + ” max:” + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
09min:0 max:274736
10min:274736 max:524997
11min:524997 max:775025
12min:775025 max:{ “$maxKey” : 1 }
13connecting to: test
14connecting to: test
15connecting to: test
16connecting to: test
17> for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }
18…
19{
20 “result” : {
21 “db” : “mrdb274736”,
22 “collection” : “mrout274736”
23 },
24 “timeMillis” : 105821,
25 “counts” : {
26 “input” : 2500013,
27 “emit” : 2500013,
28 “reduce” : 250364,
29 “output” : 250255
30 },
31 “ok” : 1
32}
33…
这才像话!我们现在降到了100秒,这意味着相比一个线程而言已经提升了2倍。还算差强人意吧。现在我们只有4个核所以只快了2倍,要是在8核CPU上将会快4倍,以此类推。
使用纯JavaScript模式
当把输入数据拆分到不同线程上去的时候,发生了一些有趣的事情:每个线程现在有大约250000个不同的值来输出,而不是1百万。这意味着我们可以使用“纯JS模式”,它可以通过使用jsMode:true来开启。开启后,MongoDB在处理时将不会把对象在JS和BSON之间来回翻译,相反,它使用一个限额500000个key的内部JS字典来化简所有对象。让我们看看这是否有用:
01> varmapred = function(min, max) {
02returndb.runCommand({ mapreduce: “uniques”,
03map: function() { emit(this.dim0, 1); },
04reduce: function (key, values) { returnArray.sum(values); },
05out: { replace: “mrout” + min, db: “mrdb”+ min },
06sort: {dim0: 1},
07query: { dim0: { $gte: min, $lt: max } },
08jsMode: true }) }
09> threads = []; for (var i = 0; i < numThreads; ++i) { varmin = (i == 0) ? 0 : keys[i * inc].dim0; varmax = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print(”min:”+ min + ” max:” + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
10min:0 max:274736
11min:274736 max:524997
12min:524997 max:775025
13min:775025 max:{ “$maxKey” : 1 }
14connecting to: test
15connecting to: test
16connecting to: test
17connecting to: test
18> for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }
19…
20{
21 “result” : {
22 “db” : “mrdb274736”,
23 “collection” : “mrout274736”
24 },
25 “timeMillis” : 70507,
26 “counts” : {
27 “input” : 2500013,
28 “emit” : 2500013,
29 “reduce” : 250156,
30 “output” : 250255
31 },
32 “ok” : 1
33}
34…
现在我们降到了70秒,就搞定了任务!jsMode真心有用,尤其是当对象有很多字段的时候。这里只有一个数字字段就已经下降了30%。
MongoDB在2.6版本上的改进
在很早的2.6版本中,在任何的js函数调用的时候,我们就通过一段代码设置一个可选参数”args“。这种做法并不标准,不在使用。但是它确有留下来的原因(查看 SERVER-4654)。让我们从Git资源库中导入MongoDB,编译并运行进行测试:
01…
02{
03 “result” : {
04 “db” : “mrdb274736”,
05 “collection” : “mrout274736”
06 },
07 “timeMillis” : 62785,
08 “counts” : {
09 “input” : 2500013,
10 “emit” : 2500013,
11 “reduce” : 250156,
12 “output” : 250255
13 },
14 “ok” : 1
15}
16…
这是明显的提高了3倍的运行速度,时间降低到了60s,大约10-15%。这种变化也提高了整体JS引擎的堆消耗。
结语
回顾一下,对于同一个MR作业,我们开始时花费1200秒,最后花费60秒,提升了20倍!这项提高应该对大部分应用都有效,即使有些trick不太理想(例如,使用多种输出dbs/collections)。至少这能提供给人们思路,如何加速他们的MR作业,希望这些特征在将来会更加易于使用。
我们一直都在努力坚持原创.......请不要一声不吭,就悄悄拿走。
我原创,你原创,我们的内容世界才会更加精彩!
【所有原创内容版权均属TechTarget,欢迎大家转发分享。但未经授权,严禁任何媒体(平面媒体、网络媒体、自媒体等)以及微信公众号复制、转载、摘编或以其他方式进行使用。】
微信公众号
TechTarget
官方微博
TechTarget中国
翻译
相关推荐
-
MongoDB与Cassandra数据库对比
MongoDB和Cassandra都属于NoSQL数据库系列,它们也恰好都是开源,但是,它们的相似之处仅此而已 […]
-
eHarmony公司利用Redis NoSQL数据库进行热存储
虽然关系型数据库不会消失,但关系型数据库管理系统有时仅在会话管理、推荐引擎和模式匹配等关键Web应用程序中担当 […]
-
2017年1月数据库流行度排行榜 新年新气象
新年新气象,数据库知识网站DB-engines最近更新了2017年1月份数据库流行度榜单。TechTarget数据库网站将与您分享1月份的榜单排名情况,让我们拭目以待。
-
2016年12月数据库流行度排行榜 几家欢乐几家愁
在过去的6个月中,数据库排行榜的前二十名总体上没有太大的变动,那么数据库知识网站DB-engines最近更新的2016年12月份数据库流行度排名情况是否一如既往的沉寂、低调呢?