? ? ? ? Mapreduce中mapper個數(shù)的確定
1)Mapreduce中mapper個數(shù)的確定: 在map階段讀取數(shù)據(jù)前,F(xiàn)ileInputFormat會將輸入文件分割成split。split的個數(shù)決定了map的個數(shù)。 影響map個數(shù),即split個數(shù)的因素主要有: 1)HDFS塊的大小,即HDFS中dfs.block.size的值。如果有一個輸入文件為1024m,當(dāng)塊為256m時,會被劃分為4個split;當(dāng)塊為128m時,會被劃分為8個split。
2)文件的大小。當(dāng)塊為128m時,如果輸入文件為128m,會被劃分為1個split;當(dāng)塊為256m,會被劃分為2個split。
3)文件的個數(shù)。FileInputFormat按照文件分割split,并且只會分割大文件,即那些大小超過HDFS塊的大小的文件。如果HDFS中dfs.block.size設(shè)置為64m,而輸入的目錄中文件有100個,則劃分后的split個數(shù)至少為100個。
4)splitsize的大小。分片是按照splitszie的大小進行分割的,一個split的大小在沒有設(shè)置的情況下,默認等于hdfs block的大小。但應(yīng)用程序可以通過兩個參數(shù)來對splitsize進行調(diào)節(jié)。 Mapper個數(shù)的計算如下:
Step1,splitsize=max(minimumsize,min(maximumsize,blocksize))。 如果沒有設(shè)置minimumsize和maximumsize,splitsize的大小默認等于blocksize
Step2,計算過程可以簡化為如下的公式,詳細算法可以參照FileInputSplit類中的getSplits方法
total_split for(file :輸入目錄中的每個文件)
{ file_split = 1;
if(file.size》splitsize)
{ file_split=file_size/splitsize;
}
total_split+=file_split; }
Mapreduce中Reducer個數(shù)確定:
1,在缺省情況下,一個mapreduce的job只有一個reducer;在大型集群中,需要使用許多reducer,中間數(shù)據(jù)都會放到一個reducer中處理,如果reducer數(shù)量不夠,會成為計算瓶頸。 2,reducer的最優(yōu)個數(shù)與集群中可用的reducer的任務(wù)槽數(shù)相關(guān),一般設(shè)置比總槽數(shù)稍微少一些的reducer數(shù)量;Hadoop文檔中推薦了兩個公式: 0.95*NUMBER_OF_NODES*mapred.tasktracker.reduce.tasks.maximum 1.75*NUMBER_OF_NODES*mapred.tasktracker.reduce.tasks.maximum
備注:NUMBER_OF_NODES是集群中的計算節(jié)點個數(shù); mapred.tasktracker.reduce.tasks.maximum:每個節(jié)點所分配的reducer任務(wù)槽的個數(shù)(節(jié)點內(nèi)核數(shù));
2,在代碼中通過:JobConf.setNumReduceTasks(Int numOfReduceTasks)方法設(shè)置reducer的個數(shù);
Hive job相關(guān)參數(shù)配置和mapreduce數(shù)目控制
在 hive\conf\hive_site.xml中配置如下性能調(diào)優(yōu)項:
開啟動態(tài)分區(qū): hive.exec.dynamic.partition=true
默認值:false
描述:是否允許動態(tài)分區(qū) hive.exec.dynamic.partition.mode=nonstrict
默認值:strict
描述:strict是避免全分區(qū)字段是動態(tài)的,必須有至少一個分區(qū)字段是指定有值的。
讀取表的時候可以不指定分區(qū)。 hive.exec.max.dynamic.partitions.pernode=100
默認值:100
描述:each mapper or reducer可以創(chuàng)建的最大動態(tài)分區(qū)數(shù) hive.exec.max.dynamic.partitions=1000
默認值:1000
描述:一個DML操作可以創(chuàng)建的最大動態(tài)分區(qū)數(shù) hive.exec.max.created.files=100000 默認值:100000
描述:一個DML操作可以創(chuàng)建的文件數(shù) 設(shè)置如下參數(shù)取消一些限制(HIVE 0.7后沒有此限制): hive.merge.mapfiles=false
默認值:true
描述:是否合并Map的輸出文件,也就是把小文件合并成一個map hive.merge.mapredfiles=false
默認值:false
描述:是否合并Reduce的輸出文件,也就是在Map輸出階段做一次reduce操作,再輸出 hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 表示執(zhí)行前進行小文件合并 配置如下參數(shù),可以開啟Hive的本地模式: hive.exec.mode.local.auto=true;(默認為false) 自0.7版本后Hive開始支持任務(wù)執(zhí)行選擇本地模式(local mode),如此一來,對數(shù)據(jù)量比較小的操作,就可以在本地執(zhí)行,這樣要比提交任務(wù)到集群執(zhí)行效率要快很多。
mapred.reduce.tasks; 設(shè)置當(dāng)前Session的map,reduce 的個數(shù),默認值是-1,為系統(tǒng)自動匹配。
一、控制hive任務(wù)中的map數(shù):
1. 通常情況下,作業(yè)會通過input的目錄產(chǎn)生一個或者多個map任務(wù)。 主要的決定因素有: input的文件總個數(shù),input的文件大小,集群設(shè)置的文件塊大小(hadoop\hdfs_site.xml中dfs.block.size的值;在HIVE中用set dfs.block.size命令查看到,該參數(shù)在HIVE中不能自定義修改);
2. 舉例:
a) 假設(shè)input目錄下有1個文件a,大小為780M,那么hadoop會將該文件a分隔成7個塊(6個128m的塊和1個12m的塊),從而產(chǎn)生7個map數(shù)
b)假設(shè)input目錄下有3個文件a,b,c,大小分別為10m,20m,130m,那么hadoop會分隔成4個塊(10m,20m,128m,2m),從而產(chǎn)生4個map數(shù)
即,如果文件大于塊大?。?28m),那么會拆分,如果小于塊大小,則把該文件當(dāng)成一個塊。
3. 是不是map數(shù)越多越好? 答案是否定的。如果一個任務(wù)有很多小文件(遠遠小于塊大小128m),則每個小文件也會被當(dāng)做一個塊,用一個map任務(wù)來完成, 而一個map任務(wù)啟動和初始化的時間遠遠大于邏輯處理的時間,就會造成很大的資源浪費。而且,同時可執(zhí)行的map數(shù)是受限的。
4.是不是保證每個map處理接近128m的文件塊,就高枕無憂了?
答案也是不一定。比如有一個127m的文件,正常會用一個map去完成, 但這個文件只有一個或者兩個小字段,卻有幾千萬的記錄, 如果map處理的邏輯比較復(fù)雜,用一個map任務(wù)去做,肯定也比較耗時。
針對上面的問題3和4,我們需要采取兩種方式來解決:即減少map數(shù)和增加map數(shù);
如何合并小文件,減少map數(shù)?
假設(shè)一個SQL任務(wù):
Select count(1) from popt_tbaccountcopy_mes where pt = ‘2012-07-04’;
該任務(wù)的inputdir/group/p_sdo_data/p_sdo_data_etl/pt/popt_tbaccountcopy_mes/pt=2012-07-04共有194個文件,其中很多是遠遠小于128m的小文件,總大小9G,正常執(zhí)行會用194個map任務(wù)。Map總共消耗的計算資源: SLOTS_MILLIS_MAPS= 623,020通過以下方法來在map執(zhí)行前合并小文件,減少map數(shù):
set mapred.max.split.size=100000000;
set mapred.min.split.size.per.node=100000000;
set mapred.min.split.size.per.rack=100000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
再執(zhí)行上面的語句,用了74個map任務(wù),map消耗的計算資源:
SLOTS_MILLIS_MAPS= 333,500
對于這個簡單SQL任務(wù),執(zhí)行時間上可能差不多,但節(jié)省了一半的計算資源。 大概解釋一下,100000000表示100M,
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
這個參數(shù)表示執(zhí)行前進行小文件合并。 前面三個參數(shù)確定合并文件塊的大小,大于文件塊大小128m的,按照128m來分隔,小于128m,大于100m的,按照100m來分隔,把那些小于100m的(包括小文件和分隔大文件剩下的),進行合并,最終生成了74個塊。 如何適當(dāng)?shù)脑黾觤ap數(shù)? 當(dāng)input的文件都很大,任務(wù)邏輯復(fù)雜,map執(zhí)行非常慢的時候,可以考慮增加Map數(shù), 來使得每個map處理的數(shù)據(jù)量減少,從而提高任務(wù)的執(zhí)行效率。 假設(shè)有這樣一個任務(wù):
Select data_desc,
count(1),
count(distinct id),
sum(case when …), sum(case when 。。.),
sum(…) from a group by data_desc
如果表a只有一個文件,大小為120M,但包含幾千萬的記錄, 如果用1個map去完成這個任務(wù),肯定是比較耗時的, 這種情況下,我們要考慮將這一個文件合理的拆分成多個, 這樣就可以用多個map任務(wù)去完成。
set mapred.reduce.tasks=10;
create table a_1 as select * from a distribute by rand(123);
這樣會將a表的記錄,隨機的分散到包含10個文件的a_1表中,再用a_1代替上面sql中的a表,則會用10個map任務(wù)去完成。 每個map任務(wù)處理大于12M(幾百萬記錄)的數(shù)據(jù),效率肯定會好很多。 看上去,貌似這兩種有些矛盾,一個是要合并小文件,一個是要把大文件拆成小文件,這點正是重點需要關(guān)注的地方, 根據(jù)實際情況,控制map數(shù)量需要遵循兩個原則:使大數(shù)據(jù)量利用合適的map數(shù);使單個map任務(wù)處理合適的數(shù)據(jù)量;
二、控制hive任務(wù)的reduce數(shù):
1. Hive自己如何確定reduce數(shù):
reduce個數(shù)的設(shè)定極大影響任務(wù)執(zhí)行效率,不指定reduce個數(shù)的情況下(mapred.reduce.tasks = -1),Hive會猜測確定一個reduce個數(shù),基于以下兩個設(shè)定: hive.exec.reducers.bytes.per.reducer(每個reduce任務(wù)處理的數(shù)據(jù)量,默認為1000^3=1G) hive.exec.reducers.max(每個任務(wù)最大的reduce數(shù),默認為999) 計算reducer數(shù)的公式很簡單N=min(參數(shù)2,總輸入數(shù)據(jù)量/參數(shù)1) 即,如果reduce的輸入(map的輸出)總大小不超過1G,那么只會有一個reduce任務(wù);如: select pt,count(1) from popt_tbaccountcopy_mes where pt = ‘2012-07-04’ group by pt; /group/p_sdo_data/p_sdo_data_etl/pt/popt_tbaccountcopy_mes/pt=2012-07-04 總大小為9G多, 因此這句有10個reduce
2. 調(diào)整reduce個數(shù)方法一:
調(diào)整hive.exec.reducers.bytes.per.reducer參數(shù)的值;
set hive.exec.reducers.bytes.per.reducer=500000000; (500M) select pt,count(1) from popt_tbaccountcopy_mes where pt = ‘2012-07-04’ group by pt; 這次有20個reduce 3.調(diào)整reduce個數(shù)方法二:
set mapred.reduce.tasks = 15; select pt,count(1) from popt_tbaccountcopy_mes where pt = ‘2012-07-04’ group by pt;這次有15個reduce
3. reduce個數(shù)并不是越多越好; 同map一樣,啟動和初始化reduce也會消耗時間和資源; 另外,有多少個reduce,就會有多少個輸出文件,如果生成了很多個小文件, 那么如果這些小文件作為下一個任務(wù)的輸入,則也會出現(xiàn)小文件過多的問題;
什么情況下只有一個reduce?
很多時候你會發(fā)現(xiàn)任務(wù)中不管數(shù)據(jù)量多大,不管你有沒有設(shè)置調(diào)整reduce個數(shù)的參數(shù),任務(wù)中一直都只有一個reduce任務(wù); 其實只有一個reduce任務(wù)的情況,除了數(shù)據(jù)量小于hive.exec.reducers.bytes.per.reducer參數(shù)值的情況外,還有以下原因:
a) 沒有g(shù)roup by的匯總,比如把select pt,count(1) from popt_tbaccountcopy_mes where pt = ‘2012-07-04’ group by pt;
寫成 select count(1) from popt_tbaccountcopy_mes where pt = ‘2012-07-04’;
b)用了Order by
有笛卡爾積
通常這些情況下,除了找辦法來變通和避免,我暫時沒有什么好的辦法, 因為這些操作都是全局的,所以hadoop不得不用一個reduce去完成;
同樣的,在設(shè)置reduce個數(shù)的時候也需要考慮這兩個原則: 使大數(shù)據(jù)量利用合適的reduce數(shù);使單個reduce任務(wù)處理合適的數(shù)據(jù)量。 hive.exec.parallel參數(shù)控制在同一個sql中的不同的job是否可以同時運行,默認為false.
下面是對于該參數(shù)的測試過程:
測試sql: select r1.a from ( select t.a from sunwg_10 t join sunwg_10000000 s on t.a=s.b) r1 join (select s.b from sunwg_100000 t join sunwg_10 s on t.a=s.b) r2 on (r1.a=r2.b);
當(dāng)參數(shù)為false的時候,三個job是順序的執(zhí)行 set hive.exec.parallel=false, 但是可以看出來其實兩個子查詢中的sql并無關(guān)系,可以并行的跑 set hive.exec.parallel=true; 總結(jié): 在資源充足的時候hive.exec.parallel會讓那些存在并發(fā)job的sql運行得更快,但同時消耗更多的資源可以評估下hive.exec.parallel對我們的刷新任務(wù)是否有幫助。
評論
查看更多