由前文知道每個BSPServiceWorker有一個WorkerServer對象,WorkerServer對象里面又有ServerData對象,作為數(shù)據(jù)實。ServerData中包含該Worker的partitionStore、edgeStore、incomingMessageStore、currentMessageStore、聚集值等。其中incomingMessageStore對象為MessageStoreByPartition(接口)類型,也就是說消息時按照分區(qū)來存儲的。MessageStoreByPartition接口的關(guān)系圖如下:
創(chuàng)新互聯(lián)服務(wù)項目包括福綿網(wǎng)站建設(shè)、福綿網(wǎng)站制作、福綿網(wǎng)頁制作以及福綿網(wǎng)絡(luò)營銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢、行業(yè)經(jīng)驗、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,福綿網(wǎng)站推廣取得了明顯的社會效益與經(jīng)濟效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到福綿省份的部分城市,未來相信會繼續(xù)擴大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!在SimpleMessageStore抽象類中,有一個ConcurrentMap
《Giraph通信模塊分析》:http://my.oschina.net/skyaugust/blog/95182
每個頂點的消息列表具體為ExtendedDataOutput類型,它繼承DataOutput接口,增加了幾個方法而已。每個消息是以字節(jié)形式寫入到ExtendedDataOutput對象中的。
發(fā)送消息時,采用異步式通信。
圖頂點的計算處理與消息通信并發(fā)執(zhí)行,在計算過程中就可以發(fā)送消息,將大規(guī)模消息發(fā)送分散在不同的時間段,避免瞬時網(wǎng)絡(luò)通信阻塞,但是接受端需要額外的空間,存儲臨時接收到的消息,相當(dāng)于空間換時間。而集中式通信,圖頂點的計算處理與消息通信串行進行,在計算完畢后,統(tǒng)一發(fā)送消息,控制和實現(xiàn)方式簡單,可在發(fā)送端對消息進行大程度優(yōu)化,但容易造成瞬時間的網(wǎng)絡(luò)通信阻塞以及增加發(fā)送端的消息存儲開銷。
不同Worker間的消息通信使用RPC方式,具體為Netty。同一Worker內(nèi),連續(xù)兩次迭代的消息直接通過內(nèi)存操作,把要發(fā)送的消息直接復(fù)制到Worker的incomingMessageStore中。下面詳述消息的存儲格式和發(fā)送機制。
Giraph使用Cache來緩存消息,當(dāng)消息達到一定閾值后,一次性發(fā)送。
既按照bulk模式進行,不會一條一條信息發(fā)送。向某個頂點發(fā)送的消息是按照
功能:把<頂點ID,data> Pair 存儲在一個 byte數(shù)組中。里面有 ExtendedDataOutput對象用來存儲數(shù)據(jù)。
該類中還有一個內(nèi)部類:VertexIdDataIterator,該內(nèi)部類繼承 VertexIdIterator類。
org.apache.giraph.comm.SendCache用來緩存發(fā)送的信息,然后以“Bulk”模式發(fā)送。在Giraph中,每個Worker上可以對應(yīng)多個分區(qū)。消息緩存的閾值是以Worker為單位計算,而不是Partition。
SendCache中有ByteArrayVertexIdData[ ] dataCache數(shù)組用來存儲發(fā)送給每個Partition的消息;有int[ ] dataSizes數(shù)組用于記錄向每個Worker發(fā)送的消息大小,若大于MAX_MSG_REQUEST_SIZE(默認為512KB)就把此Worker上的所有Partition緩存的消息發(fā)送到給該Worker,同一Worker內(nèi)消息也是如此緩存;有int[ ] initBufferSizes數(shù)組用于記錄每個Worker上的每個Partition的初始化ByteArrayVertexIdData中ExtendedDataOutput對象的大小,同一Worker上的所有Partition初始值相同,該值為平均值。記MAX_MSG_REQUEST_SIZE(message request size)值為M, 該Worker上有P個 partitions,ADDTITIONNAL_MSG_REQUEST_SIZE(比平均值大的因子)默認為0.2f,記為A。則每個Partition的初始大小為:M*(1+A) / P .
由前文知道,每個Worker都有一個NettyWorkerClientRequestProcessor用來發(fā)送消息。該類中有SendMessageCache對象用來緩存向外發(fā)送的信息。NettyWorkerClientRequestProcessor類中的sendMessageRequest(I,M)
方法如下,用于向某個頂點destVertexId發(fā)送消息message。
方法解釋:首先根據(jù)destVertexId得到對應(yīng)的partitionId和WorkerInfo,然后把消息add到SendMessageCache中,并返回向該頂點所屬Worker發(fā)送的消息大小workerMessageSize。若該值大于默認值512KB,則把此Worker對應(yīng)的所有Partition消息從SendMessageCache中刪除,把刪除的消息賦值給workerMessages,其類型為PairList
可以看到在發(fā)送消息時,先判斷是否在同一Worker上。如果是的話,調(diào)用SendWorkerMessagesRequest
org.apache.giraph.comm.requests.SendWorkerMessagesRequest類中的doRequest方法如下:
參數(shù)為該Worker的ServerData,代碼中的partitionVertexData實際為PairList
ByteArrayMessagesPerVertexStore類中的addPartitionMessages()方法如下:
當(dāng)用戶使用了Combiner,incomingMessageStore對應(yīng)的類型則為OneMessagePerVertexStore,該類為每個頂點只存儲一個消息,而非消息隊列。 結(jié)構(gòu)如下圖:
當(dāng)添加一條消息時,會把頂點已對應(yīng)的消息和要添加的消息調(diào)用combine()方法進行合并,然后存儲在上述結(jié)構(gòu)圖中。addPartitionMessages()方法如下:
在ComputeCallable中的call()方法調(diào)用computePartition(Partition)計算完所有Partition上的頂點后,調(diào)用WorkerClientRequestProcessor.flush()方法把所有剩余的消息發(fā)送出去。
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機、免備案服務(wù)器”等云主機租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。