云智慧(北京)科技有限公司陳鑫
創(chuàng)新互聯(lián)自成立以來(lái),一直致力于為企業(yè)提供從網(wǎng)站策劃、網(wǎng)站設(shè)計(jì)、網(wǎng)站建設(shè)、網(wǎng)站設(shè)計(jì)、電子商務(wù)、網(wǎng)站推廣、網(wǎng)站優(yōu)化到為企業(yè)提供個(gè)性化軟件開(kāi)發(fā)等基于互聯(lián)網(wǎng)的全面整合營(yíng)銷服務(wù)。公司擁有豐富的網(wǎng)站建設(shè)和互聯(lián)網(wǎng)應(yīng)用系統(tǒng)開(kāi)發(fā)管理經(jīng)驗(yàn)、成熟的應(yīng)用系統(tǒng)解決方案、優(yōu)秀的網(wǎng)站開(kāi)發(fā)工程師團(tuán)隊(duì)及專業(yè)的網(wǎng)站設(shè)計(jì)師團(tuán)隊(duì)。
NullWritable
不想輸出的時(shí)候,把它當(dāng)做key。NullWritable是Writable的一個(gè)特殊類,序列化的長(zhǎng)度為0,實(shí)現(xiàn)方法為空實(shí)現(xiàn),不從數(shù)據(jù)流中讀數(shù)據(jù),也不寫(xiě)入數(shù)據(jù),只充當(dāng)占位符,如在MapReduce中,如果你不需要使用鍵或值,你就可以將鍵或值聲明為NullWritable,NullWritable是一個(gè)不可變的單實(shí)例類型。
FileInputFormat繼承于InputFormat
InputFormat的作用:
驗(yàn)證輸入規(guī)范;
切分輸入文件為InputSpilts;
提供RecordReader來(lái)收集InputSplit中的輸入記錄,給Mapper進(jìn)行執(zhí)行。
RecordReader
將面向字節(jié)的InputSplit轉(zhuǎn)換為面向記錄的視圖,供Mapper或者Reducer使用運(yùn)行。因此假定處理記錄的責(zé)任界限,為任務(wù)呈現(xiàn)key-value。
SequenceFile:
SequenceFile是包含二進(jìn)制kv的扁平文件(序列化)。它提供Writer、Reader、Sorter來(lái)進(jìn)行寫(xiě)、讀、排序功能。基于CompressionType,SequenceFile有三種對(duì)于kv的壓縮方式:
Writer:不壓縮records;
RecordCompressWriter:只壓縮values;
BlockCompressWriter: 壓縮records,keys和values都被分開(kāi)壓縮在block中,block的大小可以配置;
壓縮方式由合適的CompressionCodec指定。推薦使用此類的靜態(tài)方法createWriter來(lái)選擇格式。Reader作為橋接可以讀取以上任何一種壓縮格式。
CompressionCodec:
封裝了關(guān)于流式壓縮/解壓縮的相關(guān)方法。
Mapper
Mapper將輸入的kv對(duì)映射成中間數(shù)據(jù)kv對(duì)集合。Maps將輸入記錄轉(zhuǎn)變?yōu)橹虚g記錄,其中被轉(zhuǎn)化后的記錄不必和輸入記錄類型相同。一個(gè)給定的輸入對(duì)可以映射為0或者多個(gè)輸出對(duì)。
在MRJob執(zhí)行過(guò)程中,MapReduce框架根據(jù)提前指定的InputFormat(輸入格式對(duì)象)產(chǎn)生InputSplit(輸入分片),而每個(gè)InputSplit將會(huì)由一個(gè)map任務(wù)處理。
總起來(lái)講,Mapper實(shí)現(xiàn)類通過(guò)JobConfigurable.configure(JobConf)方法傳入JobConf對(duì)象來(lái)初始化,然后在每個(gè)map任務(wù)中調(diào)用map(WritableComparable,Writable,OutputCollector,Reporter)方法處理InputSplit的每個(gè)kv對(duì)。MR應(yīng)用可以覆蓋Closeable.close方法去處理一些必須的清理工作。
輸出對(duì)不一定和輸入對(duì)類型相同。一個(gè)給定的輸入對(duì)可能映射成0或者很多的輸出對(duì)。輸出對(duì)是框架通過(guò)調(diào)用OutputCollector.colect(WritableComparable,Writable)得到。
MR應(yīng)用可以使用Reporter匯報(bào)進(jìn)度,設(shè)置應(yīng)用層級(jí)的狀態(tài)信息,更新計(jì)數(shù)器或者只是顯示應(yīng)用處于運(yùn)行狀態(tài)等。
所有和給定的輸出key關(guān)聯(lián)的中間數(shù)據(jù)都會(huì)隨后被框架分組處理,并傳給Reducer處理以產(chǎn)生最終的輸出。用戶可以通過(guò)JobConf.setOutputKeyComparatorClass(Class)指定一個(gè)Comparator控制分組處理過(guò)程。
Mapper輸出都被排序后根據(jù)Reducer數(shù)量進(jìn)行分區(qū),分區(qū)數(shù)量等于reduce任務(wù)數(shù)量。用戶可以通過(guò)實(shí)現(xiàn)自定義的Partitioner來(lái)控制哪些keys(記錄)到哪個(gè)Reducer中去。
此外,用戶還可以指定一個(gè)Combiner,調(diào)用JobConf.setCombinerClass(Class)來(lái)實(shí)現(xiàn)。這個(gè)可以來(lái)對(duì)map輸出做本地的聚合,有助于減少?gòu)膍apper到reducer的數(shù)據(jù)量。
經(jīng)過(guò)排序的中間輸出數(shù)據(jù)通常以一種簡(jiǎn)單的格式(key-len,key,value-len,value)存儲(chǔ)在SequenceFile中。應(yīng)用可以決定是否或者怎樣被壓縮以及壓縮格式,可以通過(guò)JobConf來(lái)指定CompressionCodec.
如果job沒(méi)有reducer,那么mapper的輸出結(jié)果會(huì)不經(jīng)過(guò)分組排序,直接寫(xiě)進(jìn)FileSystem.
Map數(shù)
通常map數(shù)由輸入數(shù)據(jù)總大小決定,也就是所有輸入文件的blocks數(shù)目決定。
每個(gè)節(jié)點(diǎn)并行的運(yùn)行的map數(shù)正常在10到100個(gè)。由于Map任務(wù)初始化本身需要一段時(shí)間所以map運(yùn)行時(shí)間至少在1分鐘為好。
如此,如果有10T的數(shù)據(jù)文件,每個(gè)block大小128M,最大使用為82000map數(shù),除非使用setNumMapTasks(int)(這個(gè)方法僅僅對(duì)MR框架提供一個(gè)建議值)將map數(shù)值設(shè)置到更高。
Reducer
Reducer根據(jù)key將中間數(shù)據(jù)集合處理合并為更小的數(shù)據(jù)結(jié)果集。
用戶可以通過(guò)JobConf.setNumReduceTasks(int)設(shè)置作業(yè)的reducer數(shù)目。
整體而言,Reducer實(shí)現(xiàn)類通過(guò)JobConfigurable.configure(JobConf)方法將JobConf對(duì)象傳入,并為Job設(shè)置和初始化Reducer。MR框架調(diào)用 reduce(WritableComparable, Iterator, OutputCollector,Reporter)來(lái)處理以key被分組的輸入數(shù)據(jù)。應(yīng)用可以覆蓋Closeable.close()處理必要的清理操作。
Reducer由三個(gè)主要階段組成:shuffle,sort,reduce。
shuffle
輸入到Reducer的輸入數(shù)據(jù)是Mapper已經(jīng)排過(guò)序的數(shù)據(jù).在shuffle階段,根據(jù)partition算法獲取相關(guān)的mapper地址,并通過(guò)Http協(xié)議將mapper的相應(yīng)輸出數(shù)據(jù)由reducer拉取到reducer機(jī)器上處理。
sort
框架在這個(gè)階段會(huì)根據(jù)key對(duì)reducer的輸入進(jìn)行分組(因?yàn)椴煌膍apper輸出的數(shù)據(jù)中可能含有相同的key)。
shuffle和sort是同時(shí)進(jìn)行的,同時(shí)reducer仍然在拉取map的輸出。
Secondary Sort
如果對(duì)中間數(shù)據(jù)key進(jìn)行分組的規(guī)則和在處理化簡(jiǎn)階段前對(duì)key分組規(guī)則不一致時(shí),可以通過(guò)JobConf.setOutputValueGroupingComparator(Class)設(shè)置一個(gè)Comparator。因?yàn)橹虚g數(shù)據(jù)的分組策略是通過(guò)JobConf.setOutputKeyComparatorClass(Class)設(shè)置的,可以控制中間數(shù)據(jù)根據(jù)哪些key進(jìn)行分組。而JobConf.setOutputValueGroupingComparator(Class)則可用于在數(shù)據(jù)連接情況下對(duì)value進(jìn)行二次排序。
Reduce(化簡(jiǎn))
這個(gè)階段框架循環(huán)調(diào)用 reduce(WritableComparable, Iterator, OutputCollector,Reporter)方法處理被分組的每個(gè)kv對(duì)。
reduce任務(wù)一般通過(guò)OutputCollector.collect(WritableComparable, Writable)將輸出數(shù)據(jù)寫(xiě)入文件系統(tǒng)FileSystem。應(yīng)用可以使用Reporter匯報(bào)作業(yè)執(zhí)行進(jìn)度、設(shè)置應(yīng)用層級(jí)的狀態(tài)信息并更新計(jì)數(shù)器(Counter),或者只是提示作業(yè)在運(yùn)行。
注意,Reducer的輸出不會(huì)再進(jìn)行排序。
Reducer數(shù)目
合適的reducer數(shù)目可以這樣估算:(節(jié)點(diǎn)數(shù)目mapred.tasktracker.reduce.tasks.maximum)乘以0.95或乘以1.75。因子為0.95時(shí),當(dāng)所有map任務(wù)完成時(shí)所有reducer可以立即啟動(dòng),并開(kāi)始從map機(jī)器上拉取數(shù)據(jù)。因子為1.75時(shí),最快的一些節(jié)點(diǎn)將完成第一輪reduce處理,此時(shí)框架開(kāi)始啟動(dòng)第二輪reduce任務(wù),這樣可以達(dá)到比較好的作業(yè)負(fù)載均衡。提高reduce數(shù)目會(huì)增加框架的運(yùn)行負(fù)擔(dān),但有利于提升作業(yè)的負(fù)載均衡并降低失敗的成本。上述的因子使用最好在作業(yè)執(zhí)行時(shí)框架仍然有reduce槽為前提,畢竟框架還需要對(duì)作業(yè)進(jìn)行可能的推測(cè)執(zhí)行和失敗任務(wù)的處理。
不使用Reducer
如果不需要進(jìn)行化簡(jiǎn)處理,可以將reduce數(shù)目設(shè)為0。這種情況下,map的輸出會(huì)直接寫(xiě)入到文件系統(tǒng)。輸出路徑通過(guò)setOutputPath(Path)指定??蚣茉趯?xiě)入數(shù)據(jù)到文件系統(tǒng)之前不再對(duì)map結(jié)果進(jìn)行排序。
Partitioner
Partitioner對(duì)數(shù)據(jù)按照key進(jìn)行分區(qū),從而控制map的輸出傳輸?shù)侥膫€(gè)reducer上。默認(rèn)的Partitioner算法是hash(哈希。分區(qū)數(shù)目由作業(yè)的reducer數(shù)目決定。HashPartitioner是默認(rèn)的Partitioner。
Reporter
Reporter為MR應(yīng)用提供了進(jìn)度報(bào)告、應(yīng)用狀態(tài)信息設(shè)置,和計(jì)數(shù)器(Counter)更新等功能.
Mapper和Reducer實(shí)現(xiàn)可以使用Reporter匯報(bào)進(jìn)度或者提示作業(yè)在正常運(yùn)行。在一些場(chǎng)景下,應(yīng)用在處理一些特殊的kv對(duì)時(shí)耗費(fèi)了過(guò)多時(shí)間,這個(gè)可能會(huì)因?yàn)榭蚣芗俣ㄈ蝿?wù)超時(shí)而強(qiáng)制停止了這些作業(yè)。為避免該情況,可以設(shè)置mapred.task.timeout為一個(gè)比較高的值或者將其設(shè)置為0以避免超時(shí)發(fā)生。
應(yīng)用也可以使用Reporter來(lái)更新計(jì)數(shù)(Counter)。
OutputCollector
OutputCollector是MR框架提供的通用工具來(lái)收集Mapper或者Reducer輸出數(shù)據(jù)(中間數(shù)據(jù)或者最終結(jié)果數(shù)據(jù))。
HadoopMapReduce提供了一些經(jīng)常使用的mapper、reducer和partioner的實(shí)現(xiàn)類供我們進(jìn)行學(xué)習(xí)。
以上有關(guān)configuration和job的部分在新的API中有所改變,簡(jiǎn)單說(shuō)就是在Mapper和Reducer中引入了MapContext和ReduceContext,它們封裝了configuration和outputcollector,以及reporter。