本篇內(nèi)容介紹了“Storm并發(fā)度怎么設(shè)置”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
創(chuàng)新新互聯(lián),憑借十載的網(wǎng)站設(shè)計(jì)制作、網(wǎng)站建設(shè)經(jīng)驗(yàn),本著真心·誠(chéng)心服務(wù)的企業(yè)理念服務(wù)于成都中小企業(yè)設(shè)計(jì)網(wǎng)站有超過千家案例。做網(wǎng)站建設(shè),選創(chuàng)新互聯(lián)。Storm架構(gòu):master/slave
主節(jié)點(diǎn):Nimbus
負(fù)責(zé)在集群上進(jìn)行任務(wù)(Topology)的分發(fā)與資源的調(diào)度以及監(jiān)控
工作節(jié)點(diǎn):Supervisor
接收到任務(wù)請(qǐng)求后,啟動(dòng)一個(gè)或多個(gè)Worker進(jìn)程來處理任務(wù);默認(rèn)情況下,一個(gè)Supervisor最多啟動(dòng)4個(gè)Worker
工作進(jìn)程:Worker
在Supervisor中的子進(jìn)程,存在著若干個(gè)Spout和Bolt線程,來負(fù)責(zé)Spout和Bolt組件處理任務(wù)(實(shí)際是開啟的executor線程)
作業(yè):Topologies(死循環(huán),不會(huì)結(jié)束)
Spout:獲取數(shù)據(jù)的組件
Bolt:處理數(shù)據(jù)的組件
Stream:Spout和Bolt之間數(shù)據(jù)流動(dòng)的通道
Tuple:
1)Stream的最小組成單位,Spout向Bolt發(fā)送一次數(shù)據(jù)叫一個(gè)Tuple
2)同一個(gè)Stream中Tuple的類型相同,不同的Stream中可能相同/不同
3)一個(gè)key-value形式的Map
數(shù)據(jù)流分發(fā)策略(Stream groupings):
解決Spout和Bolt之間數(shù)據(jù)傳輸(發(fā)送Tuple元組)的問題
1)shuffleGrouping:
隨機(jī)派發(fā)Stream中的Tuple到Bolt中
2)fieldsGrouping:
根據(jù)字段的哈希值與Bolt個(gè)數(shù)進(jìn)行取模操作然后進(jìn)行分組發(fā)送,一個(gè)節(jié)點(diǎn)是一個(gè)Worker, 一個(gè)Bolt是一個(gè)task, 全部節(jié)點(diǎn)的Spout或Bolt的個(gè)數(shù)叫并發(fā)度。
Storm并發(fā)度設(shè)置:
1.Worker并發(fā)度:
首先按照集群規(guī)模和集群的物理位置來設(shè)定
一般會(huì)把Worker均分到每一個(gè)節(jié)點(diǎn)里, 一個(gè)supervisor默認(rèn)設(shè)置一個(gè)Worker
2.Spout數(shù)量設(shè)定:
Spout總數(shù)默認(rèn)等于Kafka(消息中間件)對(duì)應(yīng)Topic的分區(qū)數(shù),提高吞吐速度
一般一個(gè)Worker設(shè)置一個(gè)Spout
3.Bolt1數(shù)量設(shè)定:
首先根據(jù)數(shù)據(jù)量和處理數(shù)據(jù)的時(shí)間來設(shè)定
一般情況下, Bolt1的數(shù)量是Spout數(shù)量的2倍(根據(jù)項(xiàng)目進(jìn)行修改)
4.Bolt2數(shù)量設(shè)定:
首先根據(jù)數(shù)據(jù)量和處理數(shù)據(jù)的時(shí)間來設(shè)定,因?yàn)锽olt1傳過來的中間結(jié)果數(shù)據(jù)已經(jīng)減少很多,Bolt2的數(shù)量可以酌情減少。
容錯(cuò)機(jī)制:異或方式<相同為,不同為1>
tupleId - 產(chǎn)生新數(shù)據(jù),會(huì)產(chǎn)生一個(gè)tupleId;
整個(gè)過程中的tupleId按順序兩兩異或到最后
若結(jié)果為,則數(shù)據(jù)正確,否則錯(cuò)誤
messageId - 代表整條信息,API中指定提供給程序員,long型
rootId - 代表某條信息,提供給storm框架
出現(xiàn)數(shù)據(jù)運(yùn)算失敗的兩種情況:
execute(){
1.異常(數(shù)據(jù)異常)
2.任務(wù)運(yùn)行超時(shí) -- 認(rèn)為處理失敗
}
因?yàn)閿?shù)據(jù)發(fā)送時(shí)導(dǎo)致的數(shù)據(jù)重復(fù)發(fā)送問題, 如何解決?
Ⅰ.
1.比如對(duì)訂單信息做處理, 處理成功后, 把訂單信息ID存儲(chǔ)到Redis(set)
2.信息發(fā)送時(shí), 判斷是否處理過此信息
execute(){
if()
else()
}
Ⅱ.
不作處理: 點(diǎn)擊流日日志分析: pv, uv
指標(biāo)分析: 訂單人數(shù), 訂單金額
消息的可靠性保障和acker機(jī)制: open / nextTuple / ack / fail/ close
?、?Spout類:
在發(fā)送tuple時(shí),Spout會(huì)提供一個(gè)msgId,用于在后續(xù)識(shí)別tuple;Storm會(huì)根據(jù)msgId跟蹤創(chuàng)建的tuple樹,直到某個(gè)tuple被完整處理,根據(jù)msgId調(diào)用最初發(fā)送tuple的Spout中ack()方法,檢測(cè)到超時(shí)就調(diào)用fail()方法 -- 這兩個(gè)方法的調(diào)用必須由最初創(chuàng)建這個(gè)tuple的Spout執(zhí)行;當(dāng)Spout從消息隊(duì)列(Kafka/RocketMQ)中取出一條數(shù)據(jù)時(shí),實(shí)際上沒有被取出,而是保持一個(gè)掛起狀態(tài),等待消息完成的信號(hào),掛起狀態(tài)的信息不會(huì)被發(fā)送到其它的消費(fèi)者;當(dāng)該消息被"取出"時(shí),隊(duì)列會(huì)將消息體數(shù)據(jù)和一個(gè)唯一的msgId提供給客戶端,當(dāng)Spout的ack()/fail()方法被調(diào)用時(shí),Spout根據(jù)發(fā)送的id向隊(duì)列請(qǐng)求將消息從隊(duì)列中移除/重新放入隊(duì)列。
?、?acker任務(wù):
高效的實(shí)現(xiàn)可靠性 -- 必須顯式的在Bolt中調(diào)用定義在Spout中的ack()和fail()方法,Storm拓?fù)溆幸恍┨厥獾姆Q為"acker"的任務(wù),負(fù)責(zé)跟蹤Spout發(fā)送的tuple的DAG,當(dāng)一個(gè)acker發(fā)現(xiàn)DAG結(jié)束后,它就會(huì)給創(chuàng)建Spout tuple的Spout任務(wù)發(fā)送一條消息,讓這個(gè)任務(wù)來應(yīng)答這個(gè)消息。acker并不會(huì)直接的跟蹤tuple樹,在acker樹中存儲(chǔ)了一個(gè)表,用于將Spout tuple的id與一對(duì)值相映射,id為創(chuàng)建這個(gè)tuple的任務(wù)id,第二個(gè)值為一個(gè)64bit的數(shù)字(ack val),這個(gè)值是這棵樹中所有被創(chuàng)建的或者被應(yīng)答的tuple的tuple id進(jìn)行異或運(yùn)算的結(jié)果值。
?、?移除可靠性:
1.將 Config.TOPOLOGY_ACKERS 設(shè)置為
2.在SpoutOutputCollector.emit 方法中省略消息 id 來關(guān)閉 spout tuple 的跟蹤功能
3.在發(fā)送 tuple 的時(shí)候選擇發(fā)送“非錨定”的(unanchored)tuple
“Storm并發(fā)度怎么設(shè)置”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)-成都網(wǎng)站建設(shè)公司網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!