v mapreduce中,map階段處理的數據如何傳遞給reduce階段,是mapreduce框架中最關鍵的一個流程,這個流程就叫shuffle;
讓客戶滿意是我們工作的目標,不斷超越客戶的期望值來自于我們對這個行業(yè)的熱愛。我們立志把好的技術通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領域值得信任、有價值的長期合作伙伴,公司提供的服務項目有:域名與空間、虛擬主機、營銷軟件、網站建設、耀州網站維護、網站推廣。
v shuffle: 洗牌、發(fā)牌——(核心機制:數據分區(qū),排序,緩存);
v 具體來說:就是將maptask輸出的處理結果數據,分發(fā)給reducetask,并在分發(fā)的過程中,對數據按key進行了分區(qū)和排序;
Shuffle緩存流程:
shuffle是MR處理流程中的一個過程,它的每一個處理步驟是分散在各個map task和reduce task節(jié)點上完成的,整體來看,分為3個操作:
1、分區(qū)partition
2、Sort根據key排序
3、Combiner進行局部value的合并
1、 maptask收集我們的map()方法輸出的kv對,放到內存緩沖區(qū)中
2、 從內存緩沖區(qū)不斷溢出本地磁盤文件,可能會溢出多個文件
3、 多個溢出文件會被合并成大的溢出文件
4、 在溢出過程中,及合并的過程中,都要調用partitoner進行分組和針對key進行排序
5、 reducetask根據自己的分區(qū)號,去各個maptask機器上取相應的結果分區(qū)數據
6、 reducetask會取到同一個分區(qū)的來自不同maptask的結果文件,reducetask會將這些文件再進行合并(歸并排序)
7、 合并成大文件后,shuffle的過程也就結束了,后面進入reducetask的邏輯運算過程(從文件中取出一個一個的鍵值對group,調用用戶自定義的reduce()方法)
Shuffle中的緩沖區(qū)大小會影響到mapreduce程序的執(zhí)行效率,原則上說,緩沖區(qū)越大,磁盤io的次數越少,執(zhí)行速度就越快
緩沖區(qū)的大小可以通過參數調整, 參數:io.sort.mb 默認100M
Java的序列化是一個重量級序列化框架(Serializable),一個對象被序列化后,會附帶很多額外的信息(各種校驗信息,header,繼承體系。。。。),不便于在網絡中高效傳輸;
所以,hadoop自己開發(fā)了一套序列化機制(Writable),精簡,高效
簡單代碼驗證兩種序列化機制的差別:
public class TestSeri { public static void main(String[] args) throws Exception { //定義兩個ByteArrayOutputStream,用來接收不同序列化機制的序列化結果 ByteArrayOutputStream ba = new ByteArrayOutputStream(); ByteArrayOutputStream ba2 = new ByteArrayOutputStream();
//定義兩個DataOutputStream,用于將普通對象進行jdk標準序列化 DataOutputStream dout = new DataOutputStream(ba); DataOutputStream dout2 = new DataOutputStream(ba2); ObjectOutputStream obout = new ObjectOutputStream(dout2); //定義兩個bean,作為序列化的源對象 ItemBeanSer itemBeanSer = new ItemBeanSer(1000L, 89.9f); ItemBean itemBean = new ItemBean(1000L, 89.9f);
//用于比較String類型和Text類型的序列化差別 Text atext = new Text("a"); // atext.write(dout); itemBean.write(dout);
byte[] byteArray = ba.toByteArray();
//比較序列化結果 System.out.println(byteArray.length); for (byte b : byteArray) {
System.out.print(b); System.out.print(":"); }
System.out.println("-----------------------");
String astr = "a"; // dout2.writeUTF(astr); obout.writeObject(itemBeanSer);
byte[] byteArray2 = ba2.toByteArray(); System.out.println(byteArray2.length); for (byte b : byteArray2) { System.out.print(b); System.out.print(":"); } } } |
如果需要將自定義的bean放在key中傳輸,則還需要實現comparable接口,因為mapreduce框中的shuffle過程一定會對key進行排序,此時,自定義的bean實現的接口應該是:
public class FlowBean implements WritableComparable
需要自己實現的方法是:
/** * 反序列化的方法,反序列化時,從流中讀取到的各個字段的順序應該與序列化時寫出去的順序保持一致 */ @Override public void readFields(DataInput in) throws IOException { upflow = in.readLong(); dflow = in.readLong(); sumflow = in.readLong();
}
/** * 序列化的方法 */ @Override public void write(DataOutput out) throws IOException {
out.writeLong(upflow); out.writeLong(dflow); //可以考慮不序列化總流量,因為總流量是可以通過上行流量和下行流量計算出來的 out.writeLong(sumflow);
} @Override public int compareTo(FlowBean o) { //實現按照sumflow的大小倒序排序 return sumflow>o.getSumflow()?-1:1; } |
Yarn是一個資源調度平臺,負責為運算程序提供服務器運算資源,相當于一個分布式的操作系統(tǒng)平臺,而mapreduce等運算程序則相當于運行于操作系統(tǒng)之上的應用程序
1、 yarn并不清楚用戶提交的程序的運行機制
2、 yarn只提供運算資源的調度(用戶程序向yarn申請資源,yarn就負責分配資源)
3、 yarn中的主管角色叫ResourceManager
4、 yarn中具體提供運算資源的角色叫NodeManager
5、 這樣一來,yarn其實就與運行的用戶程序完全解耦,就意味著yarn上可以運行各種類型的分布式運算程序(mapreduce只是其中的一種),比如mapreduce、storm程序,spark程序,tez……
6、 所以,spark、storm等運算框架都可以整合在yarn上運行,只要他們各自的框架中有符合yarn規(guī)范的資源請求機制即可
7、 Yarn就成為一個通用的資源調度平臺,從此,企業(yè)中以前存在的各種運算集群都可以整合在一個物理集群上,提高資源利用率,方便數據共享
mapreduce程序的調度過程,如下圖